yifan-c commented on code in PR #139: URL: https://github.com/apache/cassandra-sidecar/pull/139#discussion_r1849095200
########## server/src/main/java/org/apache/cassandra/sidecar/job/OperationsJob.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.utils.OperationsJobResult; +import org.apache.cassandra.sidecar.common.utils.OperationsJobResult.OperationsJobStatus; + +/** + * An abstract class representing a Operations job managed by the sidecar. + * + */ +public abstract class OperationsJob +{ + private static final Logger LOGGER = LoggerFactory.getLogger(OperationsJob.class); + + protected UUID jobId; + protected OperationsJobStatus status; + protected String failureReason; + final CountDownLatch latch = new CountDownLatch(1); + + protected OperationsJob() + { + + } + /** + * Constructs a job with a unique UUID, in Pending state + * @param jobId UUID representing the Job to be created + */ + protected OperationsJob(UUID jobId) + { + this.jobId = jobId; + this.status = OperationsJobStatus.Pending; + this.failureReason = ""; + } + + @VisibleForTesting + protected OperationsJob(UUID jobId, OperationsJobStatus status) + { + this.jobId = jobId; + this.status = status; + this.failureReason = ""; + } + + public void setStatus(OperationsJobStatus status) + { + this.status = status; + } + public OperationsJobStatus status() + { + return status; + } + public String failureReason() + { + return failureReason; + } + + public UUID jobId() + { + return jobId; + } + + /** + * Supplier specifying the functionality of the job to be triggered when the job is executed. Subclasses to + * provide operation-specific implementations + * @return a function with the operation implementation that returns a {@code JobResult} + * @throws Exception + */ + public abstract Supplier<OperationsJobResult> jobOperationSupplier() throws Exception; + + /** + * Provide a meaningful name of the operation executed by the concrete subclass. + * @return the name of the operation. eg. nodetool command name + */ + public abstract String operation(); + + /** + * Specifies the downstream operation to be performed to check if the job is running on the Cassandra node/cluster. + * This functionality is provided by the Job when it is asynchronously triggering the job via the {@code JobManager} + * For synchronous jobs this should always return false. + * @return true if the job is running downstream + */ + public abstract boolean checkInflightJob(); Review Comment: nit: ```suggestion public abstract boolean isInflight(); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/job/OperationsJob.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.utils.OperationsJobResult; +import org.apache.cassandra.sidecar.common.utils.OperationsJobResult.OperationsJobStatus; + +/** + * An abstract class representing a Operations job managed by the sidecar. + * + */ +public abstract class OperationsJob +{ + private static final Logger LOGGER = LoggerFactory.getLogger(OperationsJob.class); + + protected UUID jobId; + protected OperationsJobStatus status; + protected String failureReason; + final CountDownLatch latch = new CountDownLatch(1); + + protected OperationsJob() + { + + } + /** + * Constructs a job with a unique UUID, in Pending state + * @param jobId UUID representing the Job to be created + */ + protected OperationsJob(UUID jobId) + { + this.jobId = jobId; + this.status = OperationsJobStatus.Pending; + this.failureReason = ""; + } + + @VisibleForTesting + protected OperationsJob(UUID jobId, OperationsJobStatus status) + { + this.jobId = jobId; + this.status = status; + this.failureReason = ""; + } + + public void setStatus(OperationsJobStatus status) + { + this.status = status; + } + public OperationsJobStatus status() + { + return status; + } + public String failureReason() + { + return failureReason; + } + + public UUID jobId() + { + return jobId; + } + + /** + * Supplier specifying the functionality of the job to be triggered when the job is executed. Subclasses to + * provide operation-specific implementations + * @return a function with the operation implementation that returns a {@code JobResult} + * @throws Exception + */ + public abstract Supplier<OperationsJobResult> jobOperationSupplier() throws Exception; + + /** + * Provide a meaningful name of the operation executed by the concrete subclass. + * @return the name of the operation. eg. nodetool command name + */ + public abstract String operation(); + + /** + * Specifies the downstream operation to be performed to check if the job is running on the Cassandra node/cluster. + * This functionality is provided by the Job when it is asynchronously triggering the job via the {@code JobManager} + * For synchronous jobs this should always return false. + * @return true if the job is running downstream + */ + public abstract boolean checkInflightJob(); + + /** + * Execute the job behavior as specified in the operation supplier {@link #jobOperationSupplier()}, + * while tracking the status of the job's lifecycle. + */ + public void execute() + { + try + { + LOGGER.info("Executing job with ID: {}", jobId); + OperationsJobResult result = jobOperationSupplier().get(); + status = result.status(); + failureReason = result.reason(); + LOGGER.debug("Job with ID: {} returned with status: {}", jobId, status); + } + catch (Exception e) + { + String reason = (e.getCause() != null) ? e.getCause().getMessage() : e.getMessage(); + LOGGER.error("Failed to execute job {} with reason: {}", jobId, reason); + status = OperationsJobStatus.Failed; + failureReason = reason; + } + finally + { + latch.countDown(); + } + } + + /** + * Returns if the job execution completes within the provided wait time (in seconds) + * @param waitSeconds no. of seconds to wait for the job execution to complete + * @return true if the job completed within the specified time + */ + public boolean isResultAvailable(long waitSeconds) + { + try + { + return (!latch.await(waitSeconds, TimeUnit.SECONDS)) ? false : true; + } + catch (final InterruptedException e) + { + return false; Review Comment: preserve the interrupted flag. ```java Thread.currentThread().interrupt(); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/job/OperationsJob.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.utils.OperationsJobResult; +import org.apache.cassandra.sidecar.common.utils.OperationsJobResult.OperationsJobStatus; + +/** + * An abstract class representing a Operations job managed by the sidecar. + * + */ +public abstract class OperationsJob +{ + private static final Logger LOGGER = LoggerFactory.getLogger(OperationsJob.class); + + protected UUID jobId; + protected OperationsJobStatus status; + protected String failureReason; + final CountDownLatch latch = new CountDownLatch(1); + + protected OperationsJob() + { + + } + /** + * Constructs a job with a unique UUID, in Pending state + * @param jobId UUID representing the Job to be created + */ + protected OperationsJob(UUID jobId) + { + this.jobId = jobId; + this.status = OperationsJobStatus.Pending; + this.failureReason = ""; + } + + @VisibleForTesting + protected OperationsJob(UUID jobId, OperationsJobStatus status) + { + this.jobId = jobId; + this.status = status; + this.failureReason = ""; + } + + public void setStatus(OperationsJobStatus status) + { + this.status = status; + } + public OperationsJobStatus status() + { + return status; + } + public String failureReason() + { + return failureReason; + } + + public UUID jobId() + { + return jobId; + } + + /** + * Supplier specifying the functionality of the job to be triggered when the job is executed. Subclasses to + * provide operation-specific implementations + * @return a function with the operation implementation that returns a {@code JobResult} + * @throws Exception + */ + public abstract Supplier<OperationsJobResult> jobOperationSupplier() throws Exception; + + /** + * Provide a meaningful name of the operation executed by the concrete subclass. + * @return the name of the operation. eg. nodetool command name + */ + public abstract String operation(); + + /** + * Specifies the downstream operation to be performed to check if the job is running on the Cassandra node/cluster. + * This functionality is provided by the Job when it is asynchronously triggering the job via the {@code JobManager} + * For synchronous jobs this should always return false. + * @return true if the job is running downstream + */ + public abstract boolean checkInflightJob(); + + /** + * Execute the job behavior as specified in the operation supplier {@link #jobOperationSupplier()}, + * while tracking the status of the job's lifecycle. + */ + public void execute() Review Comment: nit: consider adding `final` to forbidden overriding this method, since the concrete logic is implemented in `jobOperationSupplier` ########## server/src/main/java/org/apache/cassandra/sidecar/job/OperationsJobManager.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.util.List; +import java.util.UUID; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import javax.inject.Inject; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.common.utils.OperationsJobResult.OperationsJobStatus; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; + +/** + * An abstraction of the management and tracking of long-running jobs running on the sidecar. + */ +@Singleton +public class OperationsJobManager +{ + private static final Logger LOGGER = LoggerFactory.getLogger(OperationsJobManager.class); + + private final OperationsJobTracker jobTracker; + private final ExecutorPools executorPools; + + /** + * Creates a manager instance with a default sized job-tracker. + * @param executorPools + */ + @Inject + public OperationsJobManager(ExecutorPools executorPools, OperationsJobTracker jobTracker) + { + this.executorPools = executorPools; + this.jobTracker = jobTracker; + } + + /** + * Fetches the inflight jobs being tracked on the sidecar + * @return instances of the jobs that are in pending or running states + */ + public List<OperationsJob> allInflightJobs() + { + return jobTracker.getJobsView().values() + .stream() + .filter(j -> j.status() == OperationsJobStatus.Pending || j.status() == OperationsJobStatus.Running) + .collect(Collectors.toList()); + } + + /** + * Fetch the job using its UUID + * @param jobId identifier of the job + * @return instance of the job or null + */ + public OperationsJob getJobIfExists(UUID jobId) + { + return jobTracker.get(jobId); + } + + /** + * Asynchronously submit (and lazily create, via the supplier) the job, if it is not currently being + * tracked and is not running downstream. The job is triggered on a separate internal thread-pool. + * The job execution failure behavior is tracked within the {@link OperationsJob}. + * @param jobId job identifier + * @param jobSupplier supplier used to create an instance of the job + * @return the instance of the job that is either being tracked or was just submitted + */ + public OperationsJob trySubmitJob(UUID jobId, Supplier<OperationsJob> jobSupplier) Review Comment: Passing a `Supplier` would have correctness issue. (Explained at the end) My suggestion is to have this signature instead. ```suggestion public boolean trySubmitJob(OperationsJob job); ``` 1. Use boolean to indicate the submission result. 2. `OperationsJob` already contains jobId. The parameter `UUID jobId` can be omitted. Now, why there is potential correctness issue. Say there are 3 threads submitting the same jobId with different suppliers at the same time. Let's say thread A wins, thread B and C lose. Since the signature returns a `OperationsJob`, there is no way to tell which thread has succeeded. All A, B and C will consider they have created the job successfully. The ambiguity can break the subsequent execution. ########## server/src/main/java/org/apache/cassandra/sidecar/job/OperationsJobManager.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.util.List; +import java.util.UUID; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import javax.inject.Inject; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.common.utils.OperationsJobResult.OperationsJobStatus; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; + +/** + * An abstraction of the management and tracking of long-running jobs running on the sidecar. + */ +@Singleton +public class OperationsJobManager +{ + private static final Logger LOGGER = LoggerFactory.getLogger(OperationsJobManager.class); + + private final OperationsJobTracker jobTracker; + private final ExecutorPools executorPools; + + /** + * Creates a manager instance with a default sized job-tracker. + * @param executorPools + */ + @Inject + public OperationsJobManager(ExecutorPools executorPools, OperationsJobTracker jobTracker) + { + this.executorPools = executorPools; + this.jobTracker = jobTracker; + } + + /** + * Fetches the inflight jobs being tracked on the sidecar + * @return instances of the jobs that are in pending or running states + */ + public List<OperationsJob> allInflightJobs() + { + return jobTracker.getJobsView().values() + .stream() + .filter(j -> j.status() == OperationsJobStatus.Pending || j.status() == OperationsJobStatus.Running) + .collect(Collectors.toList()); + } + + /** + * Fetch the job using its UUID + * @param jobId identifier of the job + * @return instance of the job or null + */ + public OperationsJob getJobIfExists(UUID jobId) + { + return jobTracker.get(jobId); Review Comment: It is odd that the `allInflightJobs` is backed by jobView, but `getJobIfExists` reads from jobTracker directly. It makes me wondering that `getJobsView` is unnecessary to have. ########## server/src/main/java/org/apache/cassandra/sidecar/job/OperationsJob.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.utils.OperationsJobResult; +import org.apache.cassandra.sidecar.common.utils.OperationsJobResult.OperationsJobStatus; + +/** + * An abstract class representing a Operations job managed by the sidecar. + * + */ +public abstract class OperationsJob +{ + private static final Logger LOGGER = LoggerFactory.getLogger(OperationsJob.class); + + protected UUID jobId; + protected OperationsJobStatus status; + protected String failureReason; + final CountDownLatch latch = new CountDownLatch(1); + + protected OperationsJob() + { + + } + /** + * Constructs a job with a unique UUID, in Pending state + * @param jobId UUID representing the Job to be created + */ + protected OperationsJob(UUID jobId) + { + this.jobId = jobId; + this.status = OperationsJobStatus.Pending; + this.failureReason = ""; + } + + @VisibleForTesting + protected OperationsJob(UUID jobId, OperationsJobStatus status) + { + this.jobId = jobId; + this.status = status; + this.failureReason = ""; + } + + public void setStatus(OperationsJobStatus status) + { + this.status = status; + } + public OperationsJobStatus status() + { + return status; + } + public String failureReason() + { + return failureReason; + } + + public UUID jobId() + { + return jobId; + } + + /** + * Supplier specifying the functionality of the job to be triggered when the job is executed. Subclasses to + * provide operation-specific implementations + * @return a function with the operation implementation that returns a {@code JobResult} + * @throws Exception + */ + public abstract Supplier<OperationsJobResult> jobOperationSupplier() throws Exception; + + /** + * Provide a meaningful name of the operation executed by the concrete subclass. + * @return the name of the operation. eg. nodetool command name + */ + public abstract String operation(); + + /** + * Specifies the downstream operation to be performed to check if the job is running on the Cassandra node/cluster. + * This functionality is provided by the Job when it is asynchronously triggering the job via the {@code JobManager} + * For synchronous jobs this should always return false. + * @return true if the job is running downstream + */ + public abstract boolean checkInflightJob(); + + /** + * Execute the job behavior as specified in the operation supplier {@link #jobOperationSupplier()}, + * while tracking the status of the job's lifecycle. + */ + public void execute() + { + try + { + LOGGER.info("Executing job with ID: {}", jobId); + OperationsJobResult result = jobOperationSupplier().get(); + status = result.status(); + failureReason = result.reason(); + LOGGER.debug("Job with ID: {} returned with status: {}", jobId, status); + } + catch (Exception e) + { + String reason = (e.getCause() != null) ? e.getCause().getMessage() : e.getMessage(); + LOGGER.error("Failed to execute job {} with reason: {}", jobId, reason); + status = OperationsJobStatus.Failed; + failureReason = reason; + } + finally + { + latch.countDown(); + } + } + + /** + * Returns if the job execution completes within the provided wait time (in seconds) + * @param waitSeconds no. of seconds to wait for the job execution to complete + * @return true if the job completed within the specified time + */ + public boolean isResultAvailable(long waitSeconds) + { + try + { + return (!latch.await(waitSeconds, TimeUnit.SECONDS)) ? false : true; Review Comment: ```suggestion return latch.await(waitSeconds, TimeUnit.SECONDS); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/job/OperationsJob.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.utils.OperationsJobResult; +import org.apache.cassandra.sidecar.common.utils.OperationsJobResult.OperationsJobStatus; + +/** + * An abstract class representing a Operations job managed by the sidecar. + * + */ +public abstract class OperationsJob +{ + private static final Logger LOGGER = LoggerFactory.getLogger(OperationsJob.class); + + protected UUID jobId; + protected OperationsJobStatus status; + protected String failureReason; + final CountDownLatch latch = new CountDownLatch(1); + + protected OperationsJob() + { + + } + /** + * Constructs a job with a unique UUID, in Pending state + * @param jobId UUID representing the Job to be created + */ + protected OperationsJob(UUID jobId) + { + this.jobId = jobId; + this.status = OperationsJobStatus.Pending; + this.failureReason = ""; + } + + @VisibleForTesting + protected OperationsJob(UUID jobId, OperationsJobStatus status) + { + this.jobId = jobId; + this.status = status; + this.failureReason = ""; + } + + public void setStatus(OperationsJobStatus status) + { + this.status = status; + } + public OperationsJobStatus status() + { + return status; + } + public String failureReason() + { + return failureReason; + } + + public UUID jobId() + { + return jobId; + } + + /** + * Supplier specifying the functionality of the job to be triggered when the job is executed. Subclasses to + * provide operation-specific implementations + * @return a function with the operation implementation that returns a {@code JobResult} + * @throws Exception + */ + public abstract Supplier<OperationsJobResult> jobOperationSupplier() throws Exception; + + /** + * Provide a meaningful name of the operation executed by the concrete subclass. + * @return the name of the operation. eg. nodetool command name + */ + public abstract String operation(); + + /** + * Specifies the downstream operation to be performed to check if the job is running on the Cassandra node/cluster. + * This functionality is provided by the Job when it is asynchronously triggering the job via the {@code JobManager} + * For synchronous jobs this should always return false. + * @return true if the job is running downstream + */ + public abstract boolean checkInflightJob(); + + /** + * Execute the job behavior as specified in the operation supplier {@link #jobOperationSupplier()}, + * while tracking the status of the job's lifecycle. + */ + public void execute() + { + try + { + LOGGER.info("Executing job with ID: {}", jobId); + OperationsJobResult result = jobOperationSupplier().get(); + status = result.status(); + failureReason = result.reason(); + LOGGER.debug("Job with ID: {} returned with status: {}", jobId, status); + } + catch (Exception e) + { + String reason = (e.getCause() != null) ? e.getCause().getMessage() : e.getMessage(); + LOGGER.error("Failed to execute job {} with reason: {}", jobId, reason); + status = OperationsJobStatus.Failed; + failureReason = reason; + } + finally + { + latch.countDown(); + } + } + + /** + * Returns if the job execution completes within the provided wait time (in seconds) + * @param waitSeconds no. of seconds to wait for the job execution to complete + * @return true if the job completed within the specified time + */ + public boolean isResultAvailable(long waitSeconds) + { + try + { + return (!latch.await(waitSeconds, TimeUnit.SECONDS)) ? false : true; + } + catch (final InterruptedException e) + { + return false; + } + } Review Comment: Not at a fan of this blocking API. Consider this option. Schedule a future task / timer, and check the result after timer times out. ``` public boolean isComplete() { // of course, not necessary to use CountDownLatch in this case return latch.getCount() == 0; } // at call-site Vertx vertx; OperationsJob operationsJob; vertx.timer(waitSeconds, TimeUnit.SECONDS).map(v -> operationsJob.isComplete())... ``` ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListOperationsJobsResponse.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.response; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Response structure of the list operations jobs API + */ +public class ListOperationsJobsResponse +{ + private final List<OperationsJobsResponse> jobs; Review Comment: It read confusing to me that the array in the json response body is also named "Response". Can you place the current `OperationsJobsResponse` class in package `org.apache.cassandra.sidecar.common.response.data` and rename the class to `OperationalJobEntry`? In other words, the `ListOperationalJobsResponse` contains an array of `OperationalJobEntry` ########## server/src/main/java/org/apache/cassandra/sidecar/job/OperationsJobTracker.java: ########## @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import com.google.common.collect.ImmutableMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.jetbrains.annotations.NotNull; + +/** + * Tracks and stores the results of long-running jobs running on the sidecar + */ +public class OperationsJobTracker extends LinkedHashMap<UUID, OperationsJob> Review Comment: The class can use some simplifications. ########## server/src/main/java/org/apache/cassandra/sidecar/job/OperationsJob.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.common.utils.OperationsJobResult; +import org.apache.cassandra.sidecar.common.utils.OperationsJobResult.OperationsJobStatus; + +/** + * An abstract class representing a Operations job managed by the sidecar. + * + */ +public abstract class OperationsJob +{ + private static final Logger LOGGER = LoggerFactory.getLogger(OperationsJob.class); + + protected UUID jobId; + protected OperationsJobStatus status; + protected String failureReason; + final CountDownLatch latch = new CountDownLatch(1); + + protected OperationsJob() + { + + } + /** + * Constructs a job with a unique UUID, in Pending state + * @param jobId UUID representing the Job to be created + */ + protected OperationsJob(UUID jobId) + { + this.jobId = jobId; + this.status = OperationsJobStatus.Pending; + this.failureReason = ""; + } + + @VisibleForTesting + protected OperationsJob(UUID jobId, OperationsJobStatus status) + { + this.jobId = jobId; + this.status = status; + this.failureReason = ""; + } + + public void setStatus(OperationsJobStatus status) + { + this.status = status; + } + public OperationsJobStatus status() + { + return status; + } + public String failureReason() + { + return failureReason; + } + + public UUID jobId() + { + return jobId; + } + + /** + * Supplier specifying the functionality of the job to be triggered when the job is executed. Subclasses to + * provide operation-specific implementations + * @return a function with the operation implementation that returns a {@code JobResult} + * @throws Exception + */ + public abstract Supplier<OperationsJobResult> jobOperationSupplier() throws Exception; Review Comment: Returning a function seems to complex. The equivalent is probably easier to understand. ```java protected abstract OperationsJobResult executeInternal() throws OperationalJobException; ``` Note that the access modifier can be `protected` and prefer a domain specific exception type. ########## server/src/main/java/org/apache/cassandra/sidecar/routes/OperationsJobsHandler.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.UUID; +import javax.inject.Inject; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.response.OperationsJobsResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.job.OperationsJob; +import org.apache.cassandra.sidecar.job.OperationsJobManager; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; + +import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.OPERATIONS_JOB_ID_PATH_PARAM; +import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.OPERATIONS_JOB_HEADER_NAME; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Handler for retrieving the status of async operations jobs running on the sidecar + */ +public class OperationsJobsHandler extends AbstractHandler<Void> +{ + private final OperationsJobManager jobManager; + @Inject + public OperationsJobsHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + CassandraInputValidator validator, + OperationsJobManager jobManager) + { + super(metadataFetcher, executorPools, validator); + this.jobManager = jobManager; + } + + protected Void extractParamsOrThrow(RoutingContext context) + { + return null; + } + + @Override + public void handleInternal(RoutingContext context, HttpServerRequest httpRequest, String host, SocketAddress remoteAddress, Void request) + { + UUID jobUUID = validateJobIdParam(context); + + executorPools.service().executeBlocking(() -> { + OperationsJob job = jobManager.getJobIfExists(jobUUID); + if (job == null) + { + String response = String.format("Unknown job with ID:%s. Please retry the operation.", jobUUID); + logger.info(response); + context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND, response)); Review Comment: ```suggestion throw wrapHttpException(HttpResponseStatus.NOT_FOUND, response); ``` Otherwise, it is going to return job and hit the `onSucesss` handler (unnecessarily). ########## server/src/main/java/org/apache/cassandra/sidecar/routes/OperationsJobsHandler.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.UUID; +import javax.inject.Inject; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.response.OperationsJobsResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.job.OperationsJob; +import org.apache.cassandra.sidecar.job.OperationsJobManager; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; + +import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.OPERATIONS_JOB_ID_PATH_PARAM; +import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.OPERATIONS_JOB_HEADER_NAME; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Handler for retrieving the status of async operations jobs running on the sidecar + */ +public class OperationsJobsHandler extends AbstractHandler<Void> +{ + private final OperationsJobManager jobManager; + @Inject + public OperationsJobsHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + CassandraInputValidator validator, + OperationsJobManager jobManager) + { + super(metadataFetcher, executorPools, validator); + this.jobManager = jobManager; + } + + protected Void extractParamsOrThrow(RoutingContext context) Review Comment: `@Override` ########## server/src/main/java/org/apache/cassandra/sidecar/routes/OperationsJobsHandler.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.UUID; +import javax.inject.Inject; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.common.response.OperationsJobsResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.job.OperationsJob; +import org.apache.cassandra.sidecar.job.OperationsJobManager; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; + +import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.OPERATIONS_JOB_ID_PATH_PARAM; +import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.OPERATIONS_JOB_HEADER_NAME; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Handler for retrieving the status of async operations jobs running on the sidecar + */ +public class OperationsJobsHandler extends AbstractHandler<Void> +{ + private final OperationsJobManager jobManager; + @Inject + public OperationsJobsHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + CassandraInputValidator validator, + OperationsJobManager jobManager) + { + super(metadataFetcher, executorPools, validator); + this.jobManager = jobManager; + } + + protected Void extractParamsOrThrow(RoutingContext context) + { + return null; + } + + @Override + public void handleInternal(RoutingContext context, HttpServerRequest httpRequest, String host, SocketAddress remoteAddress, Void request) + { + UUID jobUUID = validateJobIdParam(context); Review Comment: nit: rename to `jobId` ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/utils/OperationsJobResult.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.utils; + +/** + * Holder class for the results of a job execution. Captures the job status and reason + * as a result of performing the downstream operation. + */ +public class OperationsJobResult +{ + /** + * Encapsulates the states of the job lifecycle. All new jobs are in Pending state. + */ + public enum OperationsJobStatus + { Pending, Running, Completed, Failed } Review Comment: Enum variants name should be in upper case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

