nvharikrishna commented on code in PR #231:
URL: https://github.com/apache/cassandra-sidecar/pull/231#discussion_r2217275108


##########
server/src/main/java/org/apache/cassandra/sidecar/config/RepairConfiguration.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config;
+
+/**
+ * Configuration for Repair jobs
+ */
+public interface RepairConfiguration

Review Comment:
   ```suggestion
   public interface RepairJobsConfiguration
   ```
   Not really a configuration for repairs, but for repair jobs right? Makes 
sense to call it as `RepairJobsConfiguration`?



##########
server/src/main/java/org/apache/cassandra/sidecar/job/RepairJob.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.adapters.base.RepairOptions;
+import org.apache.cassandra.sidecar.common.request.data.RepairPayload;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.config.RepairConfiguration;
+import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Implementation of {@link OperationalJob} to perform repair operation.
+ */
+public class RepairJob extends OperationalJob
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RepairJob.class);
+    private static final String OPERATION = "repair";
+    private static final String PREVIEW_KIND_REPAIRED = "REPAIRED";
+
+    private final RepairRequestParam repairParams;
+    private final Vertx vertx;
+    private final RepairConfiguration repairConfiguration;
+    protected StorageOperations storageOperations;
+
+    /**
+     * Enum representing the status of a parent repair session
+     */
+    public enum ParentRepairStatus
+    {
+        IN_PROGRESS, COMPLETED, FAILED
+    }
+
+    /**
+     * Constructs a job with a unique UUID, in Pending state
+     *
+     * @param vertx
+     * @param repairConfiguration
+     * @param jobId               UUID representing the Job to be created
+     * @param storageOps
+     * @param repairParams
+     */
+    public RepairJob(Vertx vertx, RepairConfiguration repairConfiguration, 
UUID jobId, StorageOperations storageOps, RepairRequestParam repairParams)
+    {
+        super(jobId);
+        this.vertx = vertx;
+        this.repairConfiguration = repairConfiguration;
+        this.storageOperations = storageOps;
+        this.repairParams = repairParams;
+    }
+
+    @Override
+    public boolean isRunningOnCassandra()
+    {
+        // TODO: Leverage repair vtables to fail-fast on conflicting repairs 
(overlapping token-ranges or replica-sets)
+        // Currently does not check for concurrent repairs
+        return false;
+    }
+
+    @Override
+    protected void executeInternal()
+    {
+        Map<String, String> options = 
generateRepairOptions(repairParams.requestpayload());
+        String keyspace = repairParams.keyspace().name();
+
+        LOGGER.info("Executing repair operation for keyspace {} jobId={} 
maxRuntime={}",
+                    keyspace, this.jobId(), 
repairConfiguration.maxRepairJobRuntimeMillis());
+
+        int cmd = storageOperations.repair(keyspace, options);
+        if (cmd <= 0)
+        {
+            // repairAsync can only return 0 for replication factor 1.
+            LOGGER.info("Replication factor is 1. No repair is needed for 
keyspace '{}'", keyspace);
+        }
+        else
+        {
+            // complete the max wait time promise either when exceeding the 
wait time, or the result is available
+            Promise<Boolean> maxWaitTimePromise = Promise.promise();
+            vertx.setTimer(repairConfiguration.maxRepairJobRuntimeMillis(), d 
-> {
+                LOGGER.info("Timer Poll");
+                maxWaitTimePromise.tryComplete(true);
+            });
+
+            // Promise for completion of repair operation
+            Promise<Void> promise = Promise.promise();
+            Future<Void> resultFut = promise.future();
+
+            // main event loop checks periodically (10s) for completion
+            vertx.setPeriodic(repairConfiguration.repairPollIntervalMillis(), 
id -> queryForCompletedRepair(promise, cmd));
+            resultFut.onComplete(res -> maxWaitTimePromise.tryComplete(false));
+            Future<Boolean> maxWaitTimeFut = maxWaitTimePromise.future();
+
+            Future<Void> compositeFut = Future.any(maxWaitTimeFut, resultFut)
+                                              // If this lambda below is 
evaluated, either one of the futures have completed;
+                                              // In either case, the future 
corresponding to the job execution is returned
+                                              .compose(f -> {
+                                                  LOGGER.info("One of the 
futures ended waitStatus={} resultStatus={}",
+                                                              
maxWaitTimeFut.isComplete(), resultFut.isComplete());
+                                                  boolean isTimeout = 
(maxWaitTimeFut.succeeded()) ? maxWaitTimeFut.result() : false;
+                                                  if (isTimeout)
+                                                  {
+                                                      LOGGER.error("Timer ran 
out before the repair job completed. Repair took too long");
+                                                      // TODO: Cancel repair? 
(Nice to have)
+                                                      // We free up the thread 
(job fails) and stop polling for completion
+                                                      return 
Future.failedFuture("Repair job taking too long");
+                                                  }
+                                                  // otherwise, the result of 
the job is available
+                                                  return resultFut;
+                                              });
+            try
+            {
+                compositeFut.toCompletionStage().toCompletableFuture().get(); 
// wait
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String name()
+    {
+        return OPERATION;
+    }
+
+    private Map<String, String> generateRepairOptions(RepairPayload 
repairPayload)
+    {
+        Map<String, String> options = new HashMap<>();
+
+        List<String> tables = repairPayload.tables();
+        if (tables != null && !tables.isEmpty())
+        {
+            options.put(RepairOptions.COLUMNFAMILIES.getValue(), 
String.join(",", tables));
+        }
+
+        Boolean isPrimaryRange = repairPayload.isPrimaryRange();
+        if (isPrimaryRange != null)
+        {
+            options.put(RepairOptions.PRIMARY_RANGE.getValue(), 
String.valueOf(isPrimaryRange));
+        }
+        // TODO: Verify use-cases involving multiple DCs
+
+        String dc = repairPayload.datacenter();
+        if (dc != null)
+        {
+            options.put(RepairOptions.DATACENTERS.getValue(), dc);
+        }
+
+        List<String> hosts = repairPayload.hosts();
+        if (hosts != null && !hosts.isEmpty())
+        {
+            options.put(RepairOptions.HOSTS.getValue(), String.join(",", 
requireNonNull(hosts)));

Review Comment:
   `hosts` already validated in if condition. `requiredNonNull` seems redundant.



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/RepairHandler.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers;
+
+import java.util.Collections;
+import java.util.Set;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.DecodeException;
+import io.vertx.core.json.Json;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.request.data.RepairPayload;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.common.server.data.Name;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
+import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam;
+import org.apache.cassandra.sidecar.job.OperationalJobManager;
+import org.apache.cassandra.sidecar.job.RepairJob;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.OperationalJobUtils;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * Handler for triggering repair
+ */
+public class RepairHandler extends AbstractHandler<RepairRequestParam> 
implements AccessProtected
+{
+    private final ServiceConfiguration config;
+    private final OperationalJobManager jobManager;
+    private final Vertx vertx;
+
+    /**
+     * Constructs a handler with the provided {@code metadataFetcher}
+     *
+     * @param vertx                the vertx instance
+     * @param metadataFetcher      the metadata fetcher
+     * @param executorPools        executor pools for blocking executions
+     * @param serviceConfiguration configuration object holding config details 
of Sidecar
+     * @param validator            a validator instance to validate 
Cassandra-specific input
+     * @param jobManager           manager for long-running operational jobs
+     */
+    @Inject
+    protected RepairHandler(Vertx vertx,
+                            InstanceMetadataFetcher metadataFetcher,
+                            ExecutorPools executorPools,
+                            ServiceConfiguration serviceConfiguration,
+                            CassandraInputValidator validator,
+                            OperationalJobManager jobManager)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.vertx = vertx;
+        this.jobManager = jobManager;
+        this.config = serviceConfiguration;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected RepairRequestParam extractParamsOrThrow(RoutingContext context)
+    {
+        Name keyspace = keyspace(context, true);
+        if (keyspace == null)
+        {
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
"'keyspace' is required but not supplied");
+        }
+
+        String bodyString = context.body().asString();
+        if (bodyString == null || bodyString.equalsIgnoreCase("null")) // json 
encoder writes null as "null"
+        {
+            logger.warn("Bad request to create repair job. Received null 
payload.");
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
"Unexpected null payload for request");
+        }
+
+        RepairPayload payload;
+        try
+        {
+            payload = Json.decodeValue(bodyString, RepairPayload.class);
+        }
+        catch (DecodeException decodeException)
+        {
+            logger.warn("Bad request to create repair job. Received invalid 
JSON payload.");
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
+                                    "Invalid request payload",
+                                    decodeException);
+        }
+
+        return RepairRequestParam.from(keyspace, payload);
+    }
+
+    @Override
+    protected void handleInternal(RoutingContext context,
+                                  HttpServerRequest httpRequest,
+                                  @NotNull String host,
+                                  SocketAddress remoteAddress,
+                                  RepairRequestParam repairRequestParam)
+    {
+        StorageOperations operations = 
metadataFetcher.delegate(host).storageOperations();
+        RepairJob job = new RepairJob(vertx, config.repairConfiguration(), 
UUIDs.timeBased(), operations, repairRequestParam);
+        try
+        {
+            jobManager.trySubmitJob(job);
+        }
+        catch (OperationalJobConflictException oje)

Review Comment:
   Looks like the 
[checkConflict](https://github.com/apache/cassandra-sidecar/blob/e3e549d7aba9465c23bd7925f6d703e9c1e6c448/server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJobManager.java#L106)
 in `trySubmitJob` is checking if any job of with that name is running or not. 
It prevents running parallel repairs right? Shouldn't it allow parallel repair 
operations?



##########
client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java:
##########
@@ -803,6 +805,23 @@ public CompletableFuture<OperationalJobResponse> 
nodeDecommission(SidecarInstanc
                                             .build());
     }
 
+    /**
+     * Executes a repair operation on the provided instance, keyspace and 
options
+     * @param instance the instance where the request will be executed
+     * @param keyspace keyspace for which the repair is being performed
+     * @param payload the repair request options as payload
+     * @return a completable future of the jobs list

Review Comment:
   ```suggestion
        * @return a completable future of the repair operation job response
   ```
   sounds better?



##########
server/src/main/java/org/apache/cassandra/sidecar/config/yaml/RepairConfigurationImpl.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config.yaml;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.config.RepairConfiguration;
+
+/**
+ * Configuration for Repair jobs
+ */
+public class RepairConfigurationImpl implements RepairConfiguration
+{
+    public static final long DEFAULT_MAX_REPAIR_RUNTIME_MILLIS = 100_000L;
+    public static final long DEFAULT_REPAIR_POLLING_INTERVAL_MILLIS = 2_000L;
+
+    @JsonProperty(value = "max_repair_runtime", defaultValue = 
DEFAULT_MAX_REPAIR_RUNTIME_MILLIS + "")
+    protected long maxRepairRuntimeMillis;
+
+    @JsonProperty(value = "repair_polling_interval", defaultValue = 
DEFAULT_REPAIR_POLLING_INTERVAL_MILLIS + "")
+    protected long repairPollIntervalMillis;
+
+    public RepairConfigurationImpl()
+    {
+        this.maxRepairRuntimeMillis = DEFAULT_MAX_REPAIR_RUNTIME_MILLIS;
+        this.repairPollIntervalMillis = DEFAULT_REPAIR_POLLING_INTERVAL_MILLIS;
+    }
+
+    public RepairConfigurationImpl(long maxRepairRuntimeMillis, long 
repairPollIntervalMillis)
+    {
+        this.maxRepairRuntimeMillis = maxRepairRuntimeMillis;
+        this.repairPollIntervalMillis = repairPollIntervalMillis;
+    }
+
+    @Override
+    public long maxRepairJobRuntimeMillis()
+    {
+        return maxRepairRuntimeMillis;
+    }
+
+    @Override
+    public long repairPollIntervalMillis()
+    {
+        return repairPollIntervalMillis;
+    }
+
+    @JsonProperty(value = "repair_polling_interval")
+    public void setRepairPollIntervalMillis(long repairPollIntervalMillis)

Review Comment:
   Possible to avoid these setters and make it immutable?



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/data/RepairRequestParam.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers.data;
+
+
+import org.apache.cassandra.sidecar.common.request.data.RepairPayload;
+import org.apache.cassandra.sidecar.common.server.data.Name;
+
+/**
+ * Holder class for the {@link 
org.apache.cassandra.sidecar.handlers.RepairHandler}
+ * request parameters
+ */
+public class RepairRequestParam
+{
+    private final Name keyspace;
+    private final RepairPayload repairRequestPayload;
+
+    /**
+     * Constructor for holder class
+     */
+    public RepairRequestParam(Name keyspace, RepairPayload requestpayload)

Review Comment:
   public static from method is available, do we need this constructor to be 
public?



##########
server/src/main/java/org/apache/cassandra/sidecar/job/RepairJob.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.adapters.base.RepairOptions;
+import org.apache.cassandra.sidecar.common.request.data.RepairPayload;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.config.RepairConfiguration;
+import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Implementation of {@link OperationalJob} to perform repair operation.
+ */
+public class RepairJob extends OperationalJob
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RepairJob.class);
+    private static final String OPERATION = "repair";
+    private static final String PREVIEW_KIND_REPAIRED = "REPAIRED";
+
+    private final RepairRequestParam repairParams;
+    private final Vertx vertx;
+    private final RepairConfiguration repairConfiguration;
+    protected StorageOperations storageOperations;
+
+    /**
+     * Enum representing the status of a parent repair session
+     */
+    public enum ParentRepairStatus
+    {
+        IN_PROGRESS, COMPLETED, FAILED
+    }
+
+    /**
+     * Constructs a job with a unique UUID, in Pending state
+     *
+     * @param vertx
+     * @param repairConfiguration
+     * @param jobId               UUID representing the Job to be created
+     * @param storageOps
+     * @param repairParams
+     */
+    public RepairJob(Vertx vertx, RepairConfiguration repairConfiguration, 
UUID jobId, StorageOperations storageOps, RepairRequestParam repairParams)
+    {
+        super(jobId);
+        this.vertx = vertx;
+        this.repairConfiguration = repairConfiguration;
+        this.storageOperations = storageOps;
+        this.repairParams = repairParams;
+    }
+
+    @Override
+    public boolean isRunningOnCassandra()
+    {
+        // TODO: Leverage repair vtables to fail-fast on conflicting repairs 
(overlapping token-ranges or replica-sets)
+        // Currently does not check for concurrent repairs
+        return false;
+    }
+
+    @Override
+    protected void executeInternal()
+    {
+        Map<String, String> options = 
generateRepairOptions(repairParams.requestpayload());
+        String keyspace = repairParams.keyspace().name();
+
+        LOGGER.info("Executing repair operation for keyspace {} jobId={} 
maxRuntime={}",
+                    keyspace, this.jobId(), 
repairConfiguration.maxRepairJobRuntimeMillis());
+
+        int cmd = storageOperations.repair(keyspace, options);
+        if (cmd <= 0)
+        {
+            // repairAsync can only return 0 for replication factor 1.
+            LOGGER.info("Replication factor is 1. No repair is needed for 
keyspace '{}'", keyspace);
+        }
+        else
+        {
+            // complete the max wait time promise either when exceeding the 
wait time, or the result is available
+            Promise<Boolean> maxWaitTimePromise = Promise.promise();
+            vertx.setTimer(repairConfiguration.maxRepairJobRuntimeMillis(), d 
-> {
+                LOGGER.info("Timer Poll");

Review Comment:
   Can add job ID and keyspace to the log message so that job can be traced in 
the log messages?



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/RepairHandler.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers;
+
+import java.util.Collections;
+import java.util.Set;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.DecodeException;
+import io.vertx.core.json.Json;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.request.data.RepairPayload;
+import org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.common.server.data.Name;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.exceptions.OperationalJobConflictException;
+import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam;
+import org.apache.cassandra.sidecar.job.OperationalJobManager;
+import org.apache.cassandra.sidecar.job.RepairJob;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.OperationalJobUtils;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * Handler for triggering repair
+ */
+public class RepairHandler extends AbstractHandler<RepairRequestParam> 
implements AccessProtected
+{
+    private final ServiceConfiguration config;
+    private final OperationalJobManager jobManager;
+    private final Vertx vertx;
+
+    /**
+     * Constructs a handler with the provided {@code metadataFetcher}
+     *
+     * @param vertx                the vertx instance
+     * @param metadataFetcher      the metadata fetcher
+     * @param executorPools        executor pools for blocking executions
+     * @param serviceConfiguration configuration object holding config details 
of Sidecar
+     * @param validator            a validator instance to validate 
Cassandra-specific input
+     * @param jobManager           manager for long-running operational jobs
+     */
+    @Inject
+    protected RepairHandler(Vertx vertx,
+                            InstanceMetadataFetcher metadataFetcher,
+                            ExecutorPools executorPools,
+                            ServiceConfiguration serviceConfiguration,
+                            CassandraInputValidator validator,
+                            OperationalJobManager jobManager)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.vertx = vertx;
+        this.jobManager = jobManager;
+        this.config = serviceConfiguration;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected RepairRequestParam extractParamsOrThrow(RoutingContext context)
+    {
+        Name keyspace = keyspace(context, true);
+        if (keyspace == null)
+        {
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
"'keyspace' is required but not supplied");
+        }
+
+        String bodyString = context.body().asString();
+        if (bodyString == null || bodyString.equalsIgnoreCase("null")) // json 
encoder writes null as "null"
+        {
+            logger.warn("Bad request to create repair job. Received null 
payload.");
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
"Unexpected null payload for request");
+        }
+
+        RepairPayload payload;
+        try
+        {
+            payload = Json.decodeValue(bodyString, RepairPayload.class);
+        }
+        catch (DecodeException decodeException)
+        {
+            logger.warn("Bad request to create repair job. Received invalid 
JSON payload.");
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
+                                    "Invalid request payload",
+                                    decodeException);
+        }
+
+        return RepairRequestParam.from(keyspace, payload);
+    }
+
+    @Override
+    protected void handleInternal(RoutingContext context,
+                                  HttpServerRequest httpRequest,
+                                  @NotNull String host,
+                                  SocketAddress remoteAddress,
+                                  RepairRequestParam repairRequestParam)
+    {
+        StorageOperations operations = 
metadataFetcher.delegate(host).storageOperations();
+        RepairJob job = new RepairJob(vertx, config.repairConfiguration(), 
UUIDs.timeBased(), operations, repairRequestParam);
+        try
+        {
+            jobManager.trySubmitJob(job);
+        }
+        catch (OperationalJobConflictException oje)
+        {
+            String reason = oje.getMessage();
+            logger.error("Conflicting job encountered. reason={}", reason);

Review Comment:
   Can you log bit more details like keyspace name, table names, tokens , DC 
etc...?



##########
server/src/main/java/org/apache/cassandra/sidecar/job/RepairJob.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.adapters.base.RepairOptions;
+import org.apache.cassandra.sidecar.common.request.data.RepairPayload;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.config.RepairConfiguration;
+import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Implementation of {@link OperationalJob} to perform repair operation.
+ */
+public class RepairJob extends OperationalJob
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RepairJob.class);
+    private static final String OPERATION = "repair";
+    private static final String PREVIEW_KIND_REPAIRED = "REPAIRED";
+
+    private final RepairRequestParam repairParams;
+    private final Vertx vertx;
+    private final RepairConfiguration repairConfiguration;
+    protected StorageOperations storageOperations;
+
+    /**
+     * Enum representing the status of a parent repair session
+     */
+    public enum ParentRepairStatus
+    {
+        IN_PROGRESS, COMPLETED, FAILED
+    }
+
+    /**
+     * Constructs a job with a unique UUID, in Pending state
+     *
+     * @param vertx
+     * @param repairConfiguration
+     * @param jobId               UUID representing the Job to be created
+     * @param storageOps
+     * @param repairParams
+     */
+    public RepairJob(Vertx vertx, RepairConfiguration repairConfiguration, 
UUID jobId, StorageOperations storageOps, RepairRequestParam repairParams)
+    {
+        super(jobId);
+        this.vertx = vertx;
+        this.repairConfiguration = repairConfiguration;
+        this.storageOperations = storageOps;
+        this.repairParams = repairParams;
+    }
+
+    @Override
+    public boolean isRunningOnCassandra()
+    {
+        // TODO: Leverage repair vtables to fail-fast on conflicting repairs 
(overlapping token-ranges or replica-sets)
+        // Currently does not check for concurrent repairs
+        return false;
+    }
+
+    @Override
+    protected void executeInternal()
+    {
+        Map<String, String> options = 
generateRepairOptions(repairParams.requestpayload());
+        String keyspace = repairParams.keyspace().name();
+
+        LOGGER.info("Executing repair operation for keyspace {} jobId={} 
maxRuntime={}",
+                    keyspace, this.jobId(), 
repairConfiguration.maxRepairJobRuntimeMillis());
+
+        int cmd = storageOperations.repair(keyspace, options);
+        if (cmd <= 0)
+        {
+            // repairAsync can only return 0 for replication factor 1.
+            LOGGER.info("Replication factor is 1. No repair is needed for 
keyspace '{}'", keyspace);
+        }
+        else
+        {
+            // complete the max wait time promise either when exceeding the 
wait time, or the result is available
+            Promise<Boolean> maxWaitTimePromise = Promise.promise();
+            vertx.setTimer(repairConfiguration.maxRepairJobRuntimeMillis(), d 
-> {
+                LOGGER.info("Timer Poll");
+                maxWaitTimePromise.tryComplete(true);
+            });
+
+            // Promise for completion of repair operation
+            Promise<Void> promise = Promise.promise();
+            Future<Void> resultFut = promise.future();
+
+            // main event loop checks periodically (10s) for completion
+            vertx.setPeriodic(repairConfiguration.repairPollIntervalMillis(), 
id -> queryForCompletedRepair(promise, cmd));
+            resultFut.onComplete(res -> maxWaitTimePromise.tryComplete(false));
+            Future<Boolean> maxWaitTimeFut = maxWaitTimePromise.future();
+
+            Future<Void> compositeFut = Future.any(maxWaitTimeFut, resultFut)
+                                              // If this lambda below is 
evaluated, either one of the futures have completed;
+                                              // In either case, the future 
corresponding to the job execution is returned
+                                              .compose(f -> {
+                                                  LOGGER.info("One of the 
futures ended waitStatus={} resultStatus={}",
+                                                              
maxWaitTimeFut.isComplete(), resultFut.isComplete());
+                                                  boolean isTimeout = 
(maxWaitTimeFut.succeeded()) ? maxWaitTimeFut.result() : false;
+                                                  if (isTimeout)
+                                                  {
+                                                      LOGGER.error("Timer ran 
out before the repair job completed. Repair took too long");
+                                                      // TODO: Cancel repair? 
(Nice to have)
+                                                      // We free up the thread 
(job fails) and stop polling for completion
+                                                      return 
Future.failedFuture("Repair job taking too long");
+                                                  }
+                                                  // otherwise, the result of 
the job is available
+                                                  return resultFut;
+                                              });
+            try
+            {
+                compositeFut.toCompletionStage().toCompletableFuture().get(); 
// wait

Review Comment:
   periodic task will keep on running if the job has timedout. Need to be 
cancelled?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org


Reply via email to