pauloricardomg commented on code in PR #339: URL: https://github.com/apache/cassandra-sidecar/pull/339#discussion_r3237449717
########## server/src/main/java/org/apache/cassandra/sidecar/job/storage/StorageProvider.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.job.storage; + +import java.io.Closeable; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.cassandra.sidecar.common.data.OperationalJobStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * A provider-agnostic storage abstraction for durable operational job state. + * Each {@code StorageProvider} instance is scoped to a single cluster. The cluster + * identity is configuration, not a per-call parameter. + * <p> + * This interface defines a data access pattern for persisting, modifying, and querying OperationalJobs. + * Higher-level coordination logic, such as clearing the active operation lock when a job reaches + * a terminal state, belongs in the layers that depend on this interface. + */ +public interface StorageProvider extends Closeable +{ + // --- Operational Job Storage --- + + /** + * Persist a new operational job record. + * <p> + * Implementations must provide upsert semantics as a retry safety net: if called again with the + * same job ID (e.g., due to a network timeout where the caller does not know if the first write + * succeeded), the existing record should be overwritten rather than throwing. + * <p> + * Implementations should throw {@link StorageProviderException} on write failure. + * + * @param job the job record to store + */ + void persistJob(OperationalJobRecord job); + + /** + * Find a job by its ID. + * + * @param jobId the job identifier + * @return the job record, or {@code null} if not found + */ + @Nullable + OperationalJobRecord findJob(UUID jobId); + + /** + * Update the status of an existing job. + * <p> + * Implementations should throw {@link StorageProviderException} on write failure. + * + * @param jobId the job identifier + * @param operationType the operation type (e.g. "restart") + * @param status the new status + */ + void updateJobStatus(UUID jobId, String operationType, OperationalJobStatus status); + + /** + * Retrieve stored job records, up to the specified limit. Review Comment: nit: Retrieve stored job records by descending time order ########## server/src/main/java/org/apache/cassandra/sidecar/db/ActiveClusterOpsDatabaseAccessor.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.db; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.db.schema.ActiveClusterOpsSchema; +import org.apache.cassandra.sidecar.db.schema.SidecarSchema; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.VisibleForTesting; + +/** + * Database accessor for the {@code active_cluster_ops} table. + * Provides mutual exclusion for active operations via lightweight transactions (LWT). + */ +@Singleton +public class ActiveClusterOpsDatabaseAccessor extends DatabaseAccessor<ActiveClusterOpsSchema> +{ + @Inject + public ActiveClusterOpsDatabaseAccessor(SidecarSchema sidecarSchema, CQLSessionProvider sessionProvider) + { + this(sidecarSchema.tableSchema(ActiveClusterOpsSchema.class), sessionProvider); + } + + @VisibleForTesting + public ActiveClusterOpsDatabaseAccessor(ActiveClusterOpsSchema schema, CQLSessionProvider sessionProvider) + { + super(schema, sessionProvider); + } + + public boolean trySetActiveOperation(String clusterName, String operationType, UUID operationId) + { + BoundStatement statement = tableSchema.trySetActive().bind(clusterName, operationType, operationId); + statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); + statement.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL); + ResultSet resultSet = execute(statement); + return resultSet.wasApplied(); + } + + @Nullable + public UUID getActiveOperation(String clusterName, String operationType) + { + BoundStatement statement = tableSchema.getActiveByType().bind(clusterName, operationType); + statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); + ResultSet resultSet = execute(statement); + Row row = resultSet.one(); + return row == null ? null : row.getUUID("operation_id"); + } + + @NotNull + public Map<String, UUID> getActiveOperations(String clusterName) + { + BoundStatement statement = tableSchema.getActive().bind(clusterName); + statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); + ResultSet resultSet = execute(statement); + Map<String, UUID> activeOps = new HashMap<>(); + for (Row row : resultSet) + { + activeOps.put(row.getString("operation_type"), row.getUUID("operation_id")); + } + return activeOps; + } + + public boolean clearActiveOperation(String clusterName, String operationType, UUID operationId) + { + BoundStatement statement = tableSchema.clearActive().bind(clusterName, operationType, operationId); + statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); + statement.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL); Review Comment: Shouldn't we be using global SERIAL to ensure the active operation will be consistent across all DCs? ########## server/src/main/java/org/apache/cassandra/sidecar/db/ActiveClusterOpsDatabaseAccessor.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.db; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.db.schema.ActiveClusterOpsSchema; +import org.apache.cassandra.sidecar.db.schema.SidecarSchema; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.VisibleForTesting; + +/** + * Database accessor for the {@code active_cluster_ops} table. + * Provides mutual exclusion for active operations via lightweight transactions (LWT). + */ +@Singleton +public class ActiveClusterOpsDatabaseAccessor extends DatabaseAccessor<ActiveClusterOpsSchema> +{ + @Inject + public ActiveClusterOpsDatabaseAccessor(SidecarSchema sidecarSchema, CQLSessionProvider sessionProvider) + { + this(sidecarSchema.tableSchema(ActiveClusterOpsSchema.class), sessionProvider); + } + + @VisibleForTesting + public ActiveClusterOpsDatabaseAccessor(ActiveClusterOpsSchema schema, CQLSessionProvider sessionProvider) + { + super(schema, sessionProvider); + } + + public boolean trySetActiveOperation(String clusterName, String operationType, UUID operationId) + { + BoundStatement statement = tableSchema.trySetActive().bind(clusterName, operationType, operationId); + statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); + statement.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL); Review Comment: Shouldn't we be using global `SERIAL` to ensure the active operation will be consistent across all DCs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

