yifan-c commented on code in PR #45: URL: https://github.com/apache/cassandra-sidecar/pull/45#discussion_r1194376151
########## src/main/java/org/apache/cassandra/sidecar/concurrent/ConcurrencyLimiter.java: ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.concurrent; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntSupplier; + +import com.google.common.annotations.VisibleForTesting; + +/** + * A class that provides functionality for a concurrency limiter where implementing consumers can try to acquire a + * permit before executing an operation, and later releasing the permit when the operation is completed. This + * implementation relies on clients playing fairly and not acquiring more permits or releasing more permits than they + * are allowed. This class is intended to be used on constrained resources, for example uploads to the server, where + * we want to limit the amount of concurrent uploads to the server, and if we are too busy, we want to provide a + * mechanism to deny new uploads until the server has been freed up. + * + * <p>The intended usage of this class is as follows: + * + * <pre> + * if (limiter.tryAcquire()) { + * try { + * // .. some expensive operation + * } finally { + * limiter.releasePermit(); + * } + * } else { + * // .. handle the case where the limit has been reached + * } + * </pre> + * + * <p>In a vertx {@link io.vertx.core.Handler}, this can be implemented as follows: + * + * <pre> + * public void handle(RoutingContext context) { + * if (!limiter.tryAcquire()) { + * // handle the case where the limit has been reached + * return; + * } + * // to make sure that permit is always released + * context.addEndHandler(v -> limiter.releasePermit()); + * // .. some expensive operation + * } + * </pre> + */ +public class ConcurrencyLimiter +{ + @VisibleForTesting + final AtomicInteger permits = new AtomicInteger(0); Review Comment: instead of exposing the mutable `AtomicInteger`, can you add a method to return the current `permits: int` and make the filed private instead? ```java public int acquiredPermits() { return permits.get(); } ``` ########## src/main/java/org/apache/cassandra/sidecar/data/SchemaRequest.java: ########## @@ -15,24 +15,34 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.cassandra.sidecar.common.data; +package org.apache.cassandra.sidecar.data; import org.jetbrains.annotations.Nullable; /** * Holder class for the {@link org.apache.cassandra.sidecar.routes.SchemaHandler} * request parameters */ -public class SchemaRequest extends QualifiedTableName +public class SchemaRequest Review Comment: Should we rename this group of classes to Payload in this patch? For this one, `SchemaRequestPayload` ########## src/test/integration/org/apache/cassandra/sidecar/IntegrationTestBase.java: ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.Session; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.util.Modules; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServer; +import io.vertx.ext.web.client.WebClient; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.sidecar.common.data.QualifiedTableName; +import org.apache.cassandra.sidecar.common.testing.CassandraTestContext; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Base class for integration test. + * Start a docker container of cassandra at the begining of each test, and Review Comment: ```suggestion * Start a docker container of cassandra at the beginning of each test, and ``` ########## src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.utils; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Base64; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.file.AsyncFile; +import io.vertx.core.file.FileSystem; +import io.vertx.core.file.OpenOptions; +import io.vertx.ext.web.handler.HttpException; + +/** + * Implementation of {@link ChecksumVerifier}. Here we use MD5 implementation of {@link java.security.MessageDigest} + * for calculating checksum. And match the calculated checksum with expected checksum obtained from request. + */ +public class MD5ChecksumVerifier implements ChecksumVerifier +{ + private static final Logger LOGGER = LoggerFactory.getLogger(MD5ChecksumVerifier.class); + private final FileSystem fs; + + public MD5ChecksumVerifier(FileSystem fs) + { + this.fs = fs; + } + + public Future<String> verify(String expectedChecksum, String filePath) + { + if (expectedChecksum == null) + { + return Future.succeededFuture(filePath); + } + + LOGGER.debug("Validating MD5. Expected to match: {}", expectedChecksum); + + return fs.open(filePath, new OpenOptions()) + .compose(this::calculateMD5) + .compose(computedChecksum -> { + if (!expectedChecksum.equals(computedChecksum)) + return Future.failedFuture(new HttpException(HttpResponseStatus.BAD_REQUEST.code(), + String.format("Checksum mismatch. " + + "computed_checksum=%s, " + + "expected_checksum=%s, " + + "algorithm=MD5", + computedChecksum, + expectedChecksum))); + LOGGER.debug("Checksum mismatch. computed_checksum={}, expected_checksum={}, algorithm=MD5", + computedChecksum, expectedChecksum); + return Future.succeededFuture(filePath); + }); + } + + private Future<String> calculateMD5(AsyncFile file) + { + MessageDigest digest; + try + { + digest = MessageDigest.getInstance("MD5"); + } + catch (NoSuchAlgorithmException e) + { + return Future.failedFuture(e); + } + + Promise<String> result = Promise.promise(); + file.pause() + .setReadBufferSize(64 * 1024) Review Comment: Declare a constant for the buffer size? ########## src/main/java/org/apache/cassandra/sidecar/routes/FileStreamHandler.java: ########## @@ -24,54 +24,60 @@ import com.google.inject.Inject; import io.netty.handler.codec.http.HttpHeaderNames; import io.vertx.core.Future; -import io.vertx.core.Handler; import io.vertx.core.file.FileProps; import io.vertx.core.file.FileSystem; import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; import io.vertx.ext.web.RoutingContext; -import io.vertx.ext.web.handler.HttpException; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.models.HttpResponse; import org.apache.cassandra.sidecar.utils.FileStreamer; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; import static io.netty.handler.codec.http.HttpResponseStatus.REQUESTED_RANGE_NOT_SATISFIABLE; -import static org.apache.cassandra.sidecar.utils.RequestUtils.extractHostAddressWithoutPort; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; /** * Handler for sending out files. */ -public class FileStreamHandler implements Handler<RoutingContext> +public class FileStreamHandler extends AbstractHandler<String> { - private static final Logger LOGGER = LoggerFactory.getLogger(FileStreamHandler.class); public static final String FILE_PATH_CONTEXT_KEY = "fileToTransfer"; - + private static final Logger LOGGER = LoggerFactory.getLogger(FileStreamHandler.class); Review Comment: logger defined in base class already ########## common/src/test/java/org/apache/cassandra/sidecar/common/SimpleCassandraVersionProviderTest.java: ########## @@ -78,7 +78,7 @@ void ensureOutOfOrderInsertionWorks() .add(new V41()) .add(new V30()).build(); - ICassandraFactory cassandra = provider.getCassandra(SimpleCassandraVersion.create("4.0.0")); + ICassandraFactory cassandra = provider.cassandra(SimpleCassandraVersion.create("4.0.0")); assertThat(cassandra).hasSameClassAs(new V40()); Review Comment: To test `ensureOutOfOrderInsertionWorks`, should it get V30 from version `SimpleCassandraVersion.create("3.0.0")`? After I made the change, it actually fails, meaning the insertion order has to be in the ascending order of the versions. Either updating the test case or the implementation of builder is needed. ########## src/main/java/org/apache/cassandra/sidecar/utils/CacheFactory.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.utils; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; + +import com.google.common.util.concurrent.MoreExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalListener; +import com.github.benmanes.caffeine.cache.Ticker; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Future; +import org.apache.cassandra.sidecar.Configuration; +import org.apache.cassandra.sidecar.config.CacheConfiguration; +import org.jetbrains.annotations.VisibleForTesting; + +/** + * A factory for caches used in Sidecar + */ +@Singleton +public class CacheFactory +{ + private static final Logger LOGGER = LoggerFactory.getLogger(CacheFactory.class); + + private final Cache<SSTableImporter.ImportOptions, Future<Void>> ssTableImportCache; + + @Inject + public CacheFactory(Configuration configuration, SSTableImporter ssTableImporter) + { + this(configuration, ssTableImporter, Ticker.systemTicker()); + } + + @VisibleForTesting + CacheFactory(Configuration configuration, SSTableImporter ssTableImporter, Ticker ticker) + { + this.ssTableImportCache = initSSTableImportCache(configuration.ssTableImportCacheConfiguration(), + ssTableImporter, ticker); + } + + /** + * @return the cache used for the SSTableImport requests + */ + public Cache<SSTableImporter.ImportOptions, Future<Void>> ssTableImportCache() + { + return ssTableImportCache; + } + + /** + * Initializes the SSTable Import Cache using the provided {@code configuration} and {@code ticker} + * for the cache + * + * @param configuration the Cache configuration parameters + * @param ssTableImporter the reference to the SSTable importer singleton + * @param ticker the ticker for the cache + * @return the initialized cache + */ + protected Cache<SSTableImporter.ImportOptions, Future<Void>> + initSSTableImportCache(CacheConfiguration configuration, SSTableImporter ssTableImporter, Ticker ticker) + { + Duration expireAfterAccessDuration = Duration.of(configuration.expireAfterAccessMillis(), ChronoUnit.MILLIS); + long maximumSize = configuration.maximumSize(); + LOGGER.info("Building SSTable Import Cache with expireAfterAccess={}, maxSize={}", + expireAfterAccessDuration, maximumSize); + return Caffeine.newBuilder() + .ticker(ticker) + .executor(MoreExecutors.directExecutor()) + .expireAfterAccess(expireAfterAccessDuration) + .maximumSize(maximumSize) + .recordStats() + .removalListener((RemovalListener<SSTableImporter.ImportOptions, Future<Void>>) + (options, result, cause) -> { + LOGGER.debug("Removed entry '{}' with options '{}' from SSTable Import " + + "Cache and cause {}", result, options, cause); Review Comment: ```suggestion "Cache and cause {}", result, options, cause); ``` ########## src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java: ########## @@ -50,14 +48,9 @@ List<String> dataDirs(); /** - * @return {@link CQLSession} for connecting with instance via CQL + * @return staging directory for the uploads of the cassandra instance */ - CQLSession session(); - - /** - * @return {@link JmxClient} for connecting with the instance via JMX - */ - JmxClient jmxClient(); + String uploadsStagingDir(); Review Comment: Can you call it `stagingDir`? It is just a directory for the staging data, not necessary to distinguish between scenarios like upload or others. ########## src/main/java/org/apache/cassandra/sidecar/routes/SnapshotsHandler.java: ########## @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.nio.file.NoSuchFileException; +import java.util.List; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import org.apache.commons.lang3.StringUtils; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.JsonObject; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.web.RoutingContext; +import io.vertx.ext.web.handler.HttpException; +import org.apache.cassandra.sidecar.Configuration; +import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.common.StorageOperations; +import org.apache.cassandra.sidecar.common.data.ListSnapshotFilesResponse; +import org.apache.cassandra.sidecar.common.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.data.SnapshotRequest; +import org.apache.cassandra.sidecar.snapshots.SnapshotDirectory; +import org.apache.cassandra.sidecar.snapshots.SnapshotPathBuilder; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * The SnapshotsHandler class handles snapshot operations. + * + * <ul> + * <li> + * The <b>GET</b> verb will produce a list of paths of all the snapshot files of a given + * snapshot name. + * <p> + * The query param {@code includeSecondaryIndexFiles} is used to request secondary index + * files along with other files. For example: + * <p> + * {@code /api/v1/keyspaces/ks/tables/tbl/snapshots/testSnapshot} + * lists all SSTable component files for the <i>"testSnapshot"</i> snapshot for the + * <i>"ks"</i> keyspace and the <i>"tbl"</i> table + * <p> + * {@code /api/v1/keyspaces/ks/tables/tbl/snapshots/testSnapshot?includeSecondaryIndexFiles=true} + * lists all SSTable component files, including secondary index files, for the + * <i>"testSnapshot"</i> snapshot for the <i>"ks"</i> keyspace and the <i>"tbl"</i> table + * </li> + * <li> + * The <b>PUT</b> verb creates a new snapshot for the given keyspace and table + * </li> + * <li> + * The <b>DELETE</b> verb deletes an existing snapshot for the given keyspace and table + * </li> + * </ul> + */ +@Singleton +public class SnapshotsHandler extends AbstractHandler<SnapshotRequest> +{ + private static final String INCLUDE_SECONDARY_INDEX_FILES = "includeSecondaryIndexFiles"; + private final SnapshotPathBuilder builder; + private final Configuration configuration; + + @Inject + public SnapshotsHandler(SnapshotPathBuilder builder, + Configuration configuration, + InstanceMetadataFetcher metadataFetcher, + CassandraInputValidator validator, + ExecutorPools executorPools) + { + super(metadataFetcher, executorPools, validator); + this.builder = builder; + this.configuration = configuration; + } + + @Override + public void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + String host, + SocketAddress remoteAddress, + SnapshotRequest request) + { + HttpMethod method = context.request().method(); + if (method.equals(HttpMethod.GET)) + { + listSnapshot(context, host, remoteAddress, request); + } + else if (method.equals(HttpMethod.PUT)) + { + createSnapshot(context, host, remoteAddress, request); + } + else if (method.equals(HttpMethod.DELETE)) + { + clearSnapshot(context, host, remoteAddress, request); + } + else + { + throw new UnsupportedOperationException("Method " + context.request().method() + " is not supported"); + } + } + + /** + * Lists paths of all the snapshot files of a given snapshot name. + * <p> + * The query param {@code includeSecondaryIndexFiles} is used to request secondary index + * files along with other files. For example: + * <p> + * {@code /api/v1/keyspaces/ks/tables/tbl/snapshots/testSnapshot} + * lists all SSTable component files for the <i>"testSnapshot"</i> snapshot for the + * <i>"ks"</i> keyspace and the <i>"tbl"</i> table + * <p> + * {@code /api/v1/keyspaces/ks/tables/tbl/snapshots/testSnapshot?includeSecondaryIndexFiles=true} + * lists all SSTable component files, including secondary index files, for the + * <i>"testSnapshot"</i> snapshot for the <i>"ks"</i> keyspace and the <i>"tbl"</i> table + * + * @param context the event to handle + * @param host the name of the host + * @param remoteAddress the remote address that originated the request + * @param requestParams parameters obtained from the request + */ + private void listSnapshot(RoutingContext context, + String host, + SocketAddress remoteAddress, + SnapshotRequest requestParams) + { + builder.build(host, requestParams) + .onSuccess(snapshotDirectory -> + builder.listSnapshotDirectory(snapshotDirectory, requestParams.includeSecondaryIndexFiles()) + .onSuccess(fileList -> { + if (fileList.isEmpty()) + { + String payload = "Snapshot '" + requestParams.snapshotName() + "' not found"; + context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND, payload)); + } + else + { + logger.debug("SnapshotsHandler handled request={}, remoteAddress={}, " + + "instance={}", requestParams, remoteAddress, host); + context.json(buildResponse(host, snapshotDirectory, fileList)); + } + }) + .onFailure(cause -> processFailure(cause, context, host, remoteAddress, requestParams)) + ) + .onFailure(cause -> processFailure(cause, context, host, remoteAddress, requestParams)); + } + + @Override + protected void processFailure(Throwable cause, + RoutingContext context, + String host, + SocketAddress remoteAddress, + SnapshotRequest requestParams) + { + logger.error("SnapshotsHandler failed for request={}, remoteAddress={}, instance={}, method={}", + requestParams, remoteAddress, host, context.request().method(), cause); + if (cause instanceof FileNotFoundException || cause instanceof NoSuchFileException) + { + context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND, cause.getMessage())); + } + else if (cause instanceof HttpException) + { + context.fail(cause); + } + else + { + context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid request for " + requestParams)); + } + } + + private ListSnapshotFilesResponse buildResponse(String host, + String snapshotDirectory, + List<SnapshotPathBuilder.SnapshotFile> fileList) + { + ListSnapshotFilesResponse response = new ListSnapshotFilesResponse(); + int sidecarPort = configuration.getPort(); + SnapshotDirectory directory = SnapshotDirectory.of(snapshotDirectory); + int dataDirectoryIndex = dataDirectoryIndex(host, directory.dataDirectory); + int offset = snapshotDirectory.length() + 1; + + for (SnapshotPathBuilder.SnapshotFile snapshotFile : fileList) + { + int fileNameIndex = snapshotFile.path.indexOf(snapshotDirectory) + offset; + Preconditions.checkArgument(fileNameIndex < snapshotFile.path.length(), + "Invalid snapshot file '" + snapshotFile.path + "'"); + response.addSnapshotFile( + new ListSnapshotFilesResponse.FileInfo(snapshotFile.size, + host, + sidecarPort, + dataDirectoryIndex, + directory.snapshotName, + directory.keyspace, + directory.tableName, + snapshotFile.path.substring(fileNameIndex))); + } + return response; + } + + /** + * Creates a new snapshot for the given keyspace and table. + * + * @param context the event to handle + * @param host the name of the host + * @param remoteAddress the remote address that originated the request + * @param requestParams parameters obtained from the request + */ + private void createSnapshot(RoutingContext context, + String host, + SocketAddress remoteAddress, + SnapshotRequest requestParams) + { + ExecutorPools.TaskExecutorPool pool = executorPools.service(); + pool.executeBlocking(promise -> { + CassandraAdapterDelegate delegate = metadataFetcher.delegate(host(context)); + StorageOperations storageOperations = delegate.storageOperations(); + if (storageOperations == null) + throw cassandraServiceUnavailable(); + logger.debug("Creating snapshot request={}, remoteAddress={}, instance={}", + requestParams, remoteAddress, host); + storageOperations.takeSnapshot(requestParams.snapshotName(), requestParams.keyspace(), + requestParams.tableName(), ImmutableMap.of()); + JsonObject jsonObject = new JsonObject() + .put("result", "Success"); + context.json(jsonObject); + }) + .onFailure(cause -> processCreateSnapshotFailure(cause, context, requestParams, remoteAddress, host)); + } + + private void processCreateSnapshotFailure(Throwable cause, RoutingContext context, SnapshotRequest requestParams, + SocketAddress remoteAddress, String host) + { + logger.error("SnapshotsHandler failed for request={}, remoteAddress={}, instance={}, method={}", + requestParams, remoteAddress, host, context.request().method(), cause); + + Throwable rootCause = cause instanceof UndeclaredThrowableException + ? ((UndeclaredThrowableException) cause).getUndeclaredThrowable() + : cause; + + if (rootCause instanceof IOException) + { + if (StringUtils.contains(rootCause.getMessage(), + "Snapshot " + requestParams.snapshotName() + " already exists")) + { + context.fail(wrapHttpException(HttpResponseStatus.CONFLICT, rootCause.getMessage())); + return; + } + else if (StringUtils.contains(rootCause.getMessage(), + "Cannot snapshot until bootstrap completes")) + { + // Cassandra does not allow taking snapshots while the node is JOINING the ring + context.fail(wrapHttpException(HttpResponseStatus.SERVICE_UNAVAILABLE, + "The Cassandra instance " + host + " is not available")); + } + } + else if (rootCause instanceof IllegalArgumentException) + { + if (StringUtils.contains(rootCause.getMessage(), + "Keyspace " + requestParams.keyspace() + " does not exist") || + StringUtils.contains(rootCause.getMessage(), + "Unknown keyspace/cf pair")) + { + context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND, rootCause.getMessage())); + } + else + { + context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, rootCause.getMessage())); + } + return; + } + context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid request for " + requestParams)); + } + + /** + * Clears a snapshot for the given keyspace and table. **Note**: Currently, Cassandra does not support + * the table parameter. We can add support in Cassandra for the additional parameter. + * + * @param context the event to handle + * @param host the name of the host + * @param remoteAddress the remote address that originated the request + * @param requestParams parameters obtained from the request + */ + private void clearSnapshot(RoutingContext context, + String host, + SocketAddress remoteAddress, + SnapshotRequest requestParams) + { + // Leverage the SnapshotBuilder for validation purposes. Currently, JMX does not validate for + // non-existent snapshot name or keyspace. Additionally, the current JMX implementation to clear snapshots + // does not support passing a table as a parameter. + builder.build(host, requestParams) + .compose(snapshotDirectory -> + executorPools.service().executeBlocking(promise -> { + CassandraAdapterDelegate delegate = + metadataFetcher.delegate(host(context)); + StorageOperations storageOperations = delegate.storageOperations(); + if (storageOperations == null) + throw cassandraServiceUnavailable(); + logger.debug("Clearing snapshot request={}, remoteAddress={}, instance={}", + requestParams, remoteAddress, host); + storageOperations.clearSnapshot(requestParams.snapshotName(), requestParams.keyspace(), + requestParams.tableName()); + context.response().end(); + })) + .onFailure(cause -> processFailure(cause, context, host, remoteAddress, requestParams)); + } + + private int dataDirectoryIndex(String host, String dataDirectory) + { + List<String> dataDirs = metadataFetcher.instance(host).dataDirs(); + for (int index = 0; index < dataDirs.size(); index++) + { + if (dataDirectory.startsWith(dataDirs.get(index))) + { + return index; + } + } + return -1; + } + + /** + * {@inheritDoc} + */ + @Override + protected SnapshotRequest extractParamsOrThrow(final RoutingContext context) + { + boolean includeSecondaryIndexFiles = + "true".equalsIgnoreCase(context.request().getParam(INCLUDE_SECONDARY_INDEX_FILES, "false")); Review Comment: nit ```suggestion boolean includeSecondaryIndexFiles = Boolean.parseBoolean(context.request().getParam(INCLUDE_SECONDARY_INDEX_FILES, "false")); ``` ########## src/main/java/org/apache/cassandra/sidecar/utils/RequestUtils.java: ########## @@ -18,30 +18,29 @@ package org.apache.cassandra.sidecar.utils; +import io.vertx.core.http.HttpServerRequest; + /** * Utility class for Http request related operations. */ public class RequestUtils { /** - * Given a combined host address like 127.0.0.1:9042 or [2001:db8:0:0:0:ff00:42:8329]:9042, this method - * removes port information and returns 127.0.0.1 or 2001:db8:0:0:0:ff00:42:8329. - * @param address - * @return host address without port information + * Parses a boolean parameter from the {@code request}, for the given {@code headerName}. If the request param + * is not {@code true} or {@code false}, it returns the {@code defaultValue}. + * + * @param request the request + * @param headerName the name of the header + * @param defaultValue the default value when the request parameter does not match {@code true} or {@code false} + * @return the parsed value for the {@code headerName} from the {@code request} */ - public static String extractHostAddressWithoutPort(String address) + public static boolean parseBooleanHeader(HttpServerRequest request, String headerName, boolean defaultValue) { - if (address.contains(":")) - { - // just ipv6 host name present without port information - if (address.split(":").length > 2 && !address.startsWith("[")) - { - return address; - } - String host = address.substring(0, address.lastIndexOf(':')); - // remove brackets from ipv6 addresses - return host.startsWith("[") ? host.substring(1, host.length() - 1) : host; - } - return address; + String value = request.getParam(headerName); + if ("true".equalsIgnoreCase(value)) + return true; + if ("false".equalsIgnoreCase(value)) + return false; + return defaultValue; Review Comment: Use `Boolean.parseBoolean()` instead? ########## src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableCleanupHandler.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes.sstableuploads; + +import java.nio.file.NoSuchFileException; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.routes.AbstractHandler; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.apache.cassandra.sidecar.utils.SSTableUploadsPathBuilder; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Manages cleaning up uploaded SSTables + */ +public class SSTableCleanupHandler extends AbstractHandler<String> +{ + private static final String UPLOAD_ID_PARAM = "uploadId"; + private final SSTableUploadsPathBuilder uploadPathBuilder; + + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the instance metadata fetcher + * @param uploadPathBuilder a class that provides SSTableUploads directories + * @param executorPools executor pools for blocking executions + */ + @Inject + protected SSTableCleanupHandler(InstanceMetadataFetcher metadataFetcher, + SSTableUploadsPathBuilder uploadPathBuilder, + ExecutorPools executorPools) + { + super(metadataFetcher, executorPools, null); + this.uploadPathBuilder = uploadPathBuilder; + } + + /** + * Handles cleaning up the SSTable upload staging directory + * + * @param context the context for the handler + */ + @Override + public void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + String host, + SocketAddress remoteAddress, + String uploadId) + { + if (context.request().method() != HttpMethod.DELETE) + { + context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Unknown API requested")); + return; + } Review Comment: Why would the request router route such request? It is only reregistered for `delete` ########## src/main/java/org/apache/cassandra/sidecar/utils/BaseFileSystem.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.utils; + +import java.nio.file.NoSuchFileException; +import java.util.List; +import java.util.function.Predicate; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.vertx.core.Future; +import io.vertx.core.file.FileProps; +import io.vertx.core.file.FileSystem; +import org.apache.cassandra.sidecar.cluster.InstancesConfig; +import org.apache.cassandra.sidecar.common.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; + +/** + * Provides functionality for filesytem operations Review Comment: ```suggestion * Provides functionality for filesystem operations ``` ########## vertx-client-shaded/build.gradle: ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The sole purpose of this sub-project is to produce a shaded package for the vertx-client. Currently, + * the shadow plugin has some limitations when producing a non-shaded and shaded versions of the jar. + */ + +import com.github.jengelman.gradle.plugins.shadow.transformers.CacheableTransformer +import com.github.jengelman.gradle.plugins.shadow.transformers.Transformer +import com.github.jengelman.gradle.plugins.shadow.transformers.TransformerContext +import shadow.org.apache.tools.zip.ZipEntry +import shadow.org.apache.tools.zip.ZipOutputStream + +plugins { + id('java-library') + id('maven-publish') + id('com.github.johnrengelman.shadow') version '6.1.0' +} + +group 'org.apache.cassandra.sidecar' +version project.version + +sourceCompatibility = 1.8 + +configurations { + all*.exclude(group: 'ch.qos.logback') +} + +dependencies { + shadow(group: 'org.slf4j', name: 'slf4j-api', version: "${project.slf4jVersion}") + shadow(group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.14.2') + api(project(':vertx-client')) +} + +// Relocating a Package +shadowJar { + archiveClassifier.set('') + relocate 'com.fasterxml.jackson.core', 'o.a.c.sidecar.client.shaded.com.fasterxml.jackson.core' + relocate 'com.fasterxml.jackson.databind', 'o.a.c.sidecar.client.shaded.com.fasterxml.jackson.databind' + relocate 'io.netty', 'o.a.c.sidecar.client.shaded.io.netty' + relocate 'io.vertx', 'o.a.c.sidecar.client.shaded.io.vertx' + relocate 'META-INF/native/libnetty', 'META-INF/native/sidecar_client_netty_shaded_netty' + relocate 'META-INF/native/netty', 'META-INF/native/io_sidecar_client_netty_shaded_netty' + relocate 'META-INF/versions/11/io/vertx', 'META-INF/versions/11/o/a/c/sidecar/client/shaded/io/vertx' + transform(NettyResourceTransformer.class) + mergeServiceFiles() + + dependencies { + exclude(dependency('org.slf4j:.*:.*')) + // Exclude the jackson-annotations dependency so that extension clients can keep annotating POJOs + // without the need to use shadow annotations. For example, they can continue using + // com.fasterxml.jackson.annotation.JsonProperty. + exclude(dependency('com.fasterxml.jackson.core:jackson-annotations:.*')) + } +} + +publishing { + publications { + shadow(MavenPublication) { publication -> + project.shadow.component(publication) + + groupId project.group + artifactId "vertx-client-all" + version System.getenv("CODE_VERSION") ?: "${version}" + } + } +} + +/** + * A Transformer which updates the Netty JAR META-INF/ resources to accurately + * reference shaded class names. + */ +@CacheableTransformer +class NettyResourceTransformer implements Transformer { Review Comment: MAGIC! ########## src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java: ########## @@ -0,0 +1,587 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.utils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; + +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.ext.web.handler.HttpException; +import org.apache.cassandra.sidecar.Configuration; +import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.common.TableOperations; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.VisibleForTesting; + +/** + * This class is in charge of performing SSTable imports into the desired Cassandra instance. + * Since imports are synchronized in the Cassandra side on a per table-basis, we only perform one import per + * Cassandra instance's keyspace/table, and we queue the rest of the import requests. + */ +@Singleton +public class SSTableImporter +{ + private static final Logger LOGGER = LoggerFactory.getLogger(SSTableImporter.class); + public static final boolean DEFAULT_RESET_LEVEL = true; + public static final boolean DEFAULT_CLEAR_REPAIRED = true; + public static final boolean DEFAULT_VERIFY_SSTABLES = true; + public static final boolean DEFAULT_VERIFY_TOKENS = true; + public static final boolean DEFAULT_INVALIDATE_CACHES = true; + public static final boolean DEFAULT_EXTENDED_VERIFY = true; + public static final boolean DEFAULT_COPY_DATA = false; + private final Vertx vertx; + private final ExecutorPools executorPools; + private final InstanceMetadataFetcher metadataFetcher; + private final SSTableUploadsPathBuilder uploadPathBuilder; + @VisibleForTesting + final Map<String, ImportQueue> importQueuePerHost; + + /** + * Constructs a new instance of the SSTableImporter class + * + * @param vertx the vertx instance + * @param metadataFetcher a class for fetching InstanceMetadata + * @param configuration the configuration for Sidecar + * @param executorPools the executor pool + * @param uploadPathBuilder a class that provides SSTableUploads directories + */ + @Inject + SSTableImporter(Vertx vertx, + InstanceMetadataFetcher metadataFetcher, + Configuration configuration, + ExecutorPools executorPools, + SSTableUploadsPathBuilder uploadPathBuilder) + { + this.vertx = vertx; + this.executorPools = executorPools; + this.metadataFetcher = metadataFetcher; + this.uploadPathBuilder = uploadPathBuilder; + this.importQueuePerHost = new HashMap<>(); + executorPools.internal() + .setPeriodic(configuration.getSSTableImportPollIntervalMillis(), this::processPendingImports); + } + + /** + * Queues an import with the provided import {@code options} to be processed asynchronously. The imports + * are queued in a FIFO queue. + * + * @param options import options + * @return a future for the result of the import + */ + public Future<Void> scheduleImport(ImportOptions options) + { + Promise<Void> promise = Promise.promise(); + importQueuePerHost.computeIfAbsent(key(options), this::initializeQueue) + .offer(Pair.of(promise, options)); + return promise.future(); + } + + /** + * Attempts to cancel an import for the provided {@code options}. This is a best-effort attempt, and + * if the import has been started, it will not be cancelled. + * + * @param options import options + * @return true if the options were removed from the queue, false otherwise + */ + public boolean cancelImport(ImportOptions options) + { + ImportQueue queue = importQueuePerHost.get(key(options)); + boolean removed = false; + if (queue != null) + { + removed = queue.removeIf(tuple -> options.equals(tuple.getRight())); + } + + LOGGER.debug("Cancel import for options={} was {}removed", options, removed ? "" : "not "); + return removed; + } + + /** + * Returns a key for the queues for the given {@code options}. Classes extending from {@link SSTableImporter} + * can override the {@link #key(ImportOptions)} method and provide a different key for the queue. + * + * @param options the import options + * @return a key for the queues for the given {@code options} + */ + protected String key(ImportOptions options) + { + return options.host + "$" + options.keyspace + "$" + options.tableName; + } + + /** + * Returns a new queue for the given {@code key}. Classes extending from {@link SSTableImporter} can override + * this method and provide a different implementation for the queue. + * + * @param key the key for the map + * @return a new queue for the given {@code key} + */ + protected ImportQueue initializeQueue(String key) + { + return new ImportQueue(); + } + + /** + * Processes pending imports for every host in the import queue. + * + * @param timerId a unique identifier for the periodic timer + */ + private void processPendingImports(Long timerId) + { + for (ImportQueue queue : importQueuePerHost.values()) + { + if (!queue.isEmpty()) + { + executorPools.internal() + .executeBlocking(p -> maybeDrainImportQueue(queue)); + } + } + } + + /** + * Tries to lock the queue to perform the draining. If the queue is already being drained, then it will + * not perform any operation. + * + * @param queue a queue of import tasks + */ + private void maybeDrainImportQueue(ImportQueue queue) + { + if (queue.tryLock()) + { + try + { + drainImportQueue(queue); + } + finally + { + queue.unlock(); + } + } + } + + /** + * This blocking operation will drain the {@code queue}. It will utilize a single thread + * to import the pending import requests on that host. + * + * @param queue a queue of import tasks + */ + private void drainImportQueue(ImportQueue queue) + { + while (!queue.isEmpty()) + { + Pair<Promise<Void>, ImportOptions> pair = queue.poll(); + Promise<Void> promise = pair.getLeft(); + ImportOptions options = pair.getRight(); + + CassandraAdapterDelegate cassandra = metadataFetcher.delegate(options.host); + TableOperations tableOperations = cassandra.tableOperations(); + + if (tableOperations == null) + { + promise.fail(new HttpException(HttpResponseStatus.SERVICE_UNAVAILABLE.code(), + "Cassandra service is unavailable")); + } + else + { + try + { + List<String> failedDirectories = + tableOperations.importNewSSTables(options.keyspace, + options.tableName, + options.directory, + options.resetLevel, + options.clearRepaired, + options.verifySSTables, + options.verifyTokens, + options.invalidateCaches, + options.extendedVerify, + options.copyData); + if (!failedDirectories.isEmpty()) + { + promise.fail(new HttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), + "Failed to import from directories: " + failedDirectories)); + } + else + { + promise.complete(); + cleanup(options); + } + } + catch (Exception exception) + { + LOGGER.error("Failed to import SSTables with options={}", options, exception); + promise.fail(exception); + } + } + } + } + + /** + * Removes the staging directory recursively after a successful import + * + * @param options import options + */ + private void cleanup(ImportOptions options) + { + uploadPathBuilder.resolveStagingDirectory(options.host, options.uploadId) + .compose(uploadPathBuilder::isValidDirectory) + .compose(stagingDirectory -> vertx.fileSystem() + .deleteRecursive(stagingDirectory, true)) + .onSuccess(v -> + LOGGER.debug("Successfully removed staging directory for uploadId={}, " + + "instance={}, options={}", options.uploadId, options.host, options)) + .onFailure(cause -> + LOGGER.error("Failed to remove staging directory for uploadId={}, " + + "instance={}, options={}", options.uploadId, options.host, options, + cause)); + } + + /** + * A {@link ConcurrentLinkedQueue} that allows for locking the queue while operating on it. The queue + * must be unlocked once the operations are complete. + */ + static class ImportQueue extends ConcurrentLinkedQueue<Pair<Promise<Void>, ImportOptions>> + { + private final AtomicBoolean isQueueInUse = new AtomicBoolean(false); + + /** + * @return true if the queue was successfully locked, false otherwise + */ + public boolean tryLock() + { + return isQueueInUse.compareAndSet(false, true); + } + + /** + * Unlocks the queue + */ + public void unlock() + { + isQueueInUse.set(false); + } + + public boolean removeIf(Predicate<? super Pair<Promise<Void>, ImportOptions>> filter) + { + return super.removeIf(filter); + } Review Comment: Remove it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

