yifan-c commented on code in PR #45:
URL: https://github.com/apache/cassandra-sidecar/pull/45#discussion_r1194376151


##########
src/main/java/org/apache/cassandra/sidecar/concurrent/ConcurrencyLimiter.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.concurrent;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.IntSupplier;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A class that provides functionality for a concurrency limiter where 
implementing consumers can try to acquire a
+ * permit before executing an operation, and later releasing the permit when 
the operation is completed. This
+ * implementation relies on clients playing fairly and not acquiring more 
permits or releasing more permits than they
+ * are allowed. This class is intended to be used on constrained resources, 
for example uploads to the server, where
+ * we want to limit the amount of concurrent uploads to the server, and if we 
are too busy, we want to provide a
+ * mechanism to deny new uploads until the server has been freed up.
+ *
+ * <p>The intended usage of this class is as follows:
+ *
+ * <pre>
+ *     if (limiter.tryAcquire()) {
+ *         try {
+ *             // .. some expensive operation
+ *         } finally {
+ *             limiter.releasePermit();
+ *         }
+ *     } else {
+ *         // .. handle the case where the limit has been reached
+ *     }
+ * </pre>
+ *
+ * <p>In a vertx {@link io.vertx.core.Handler}, this can be implemented as 
follows:
+ *
+ * <pre>
+ *     public void handle(RoutingContext context) {
+ *         if (!limiter.tryAcquire()) {
+ *             // handle the case where the limit has been reached
+ *             return;
+ *         }
+ *         // to make sure that permit is always released
+ *         context.addEndHandler(v -&gt; limiter.releasePermit());
+ *         // .. some expensive operation
+ *     }
+ * </pre>
+ */
+public class ConcurrencyLimiter
+{
+    @VisibleForTesting
+    final AtomicInteger permits = new AtomicInteger(0);

Review Comment:
   instead of exposing the mutable `AtomicInteger`, can you add a method to 
return the current `permits: int` and make the filed private instead? 
   
   ```java
       public int acquiredPermits()
       {
           return permits.get();
       }
   ```



##########
src/main/java/org/apache/cassandra/sidecar/data/SchemaRequest.java:
##########
@@ -15,24 +15,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.cassandra.sidecar.common.data;
+package org.apache.cassandra.sidecar.data;
 
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Holder class for the {@link 
org.apache.cassandra.sidecar.routes.SchemaHandler}
  * request parameters
  */
-public class SchemaRequest extends QualifiedTableName
+public class SchemaRequest

Review Comment:
   Should we rename this group of classes to Payload in this patch? For this 
one, `SchemaRequestPayload`



##########
src/test/integration/org/apache/cassandra/sidecar/IntegrationTestBase.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Session;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpServer;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.common.data.QualifiedTableName;
+import org.apache.cassandra.sidecar.common.testing.CassandraTestContext;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Base class for integration test.
+ * Start a docker container of cassandra at the begining of each test, and

Review Comment:
   ```suggestion
    * Start a docker container of cassandra at the beginning of each test, and
   ```



##########
src/main/java/org/apache/cassandra/sidecar/utils/MD5ChecksumVerifier.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Base64;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.core.file.FileSystem;
+import io.vertx.core.file.OpenOptions;
+import io.vertx.ext.web.handler.HttpException;
+
+/**
+ * Implementation of {@link ChecksumVerifier}. Here we use MD5 implementation 
of {@link java.security.MessageDigest}
+ * for calculating checksum. And match the calculated checksum with expected 
checksum obtained from request.
+ */
+public class MD5ChecksumVerifier implements ChecksumVerifier
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MD5ChecksumVerifier.class);
+    private final FileSystem fs;
+
+    public MD5ChecksumVerifier(FileSystem fs)
+    {
+        this.fs = fs;
+    }
+
+    public Future<String> verify(String expectedChecksum, String filePath)
+    {
+        if (expectedChecksum == null)
+        {
+            return Future.succeededFuture(filePath);
+        }
+
+        LOGGER.debug("Validating MD5. Expected to match: {}", 
expectedChecksum);
+
+        return fs.open(filePath, new OpenOptions())
+                 .compose(this::calculateMD5)
+                 .compose(computedChecksum -> {
+                     if (!expectedChecksum.equals(computedChecksum))
+                         return Future.failedFuture(new 
HttpException(HttpResponseStatus.BAD_REQUEST.code(),
+                                                                      
String.format("Checksum mismatch. "
+                                                                               
     + "computed_checksum=%s, "
+                                                                               
     + "expected_checksum=%s, "
+                                                                               
     + "algorithm=MD5",
+                                                                               
     computedChecksum,
+                                                                               
     expectedChecksum)));
+                     LOGGER.debug("Checksum mismatch. computed_checksum={}, 
expected_checksum={}, algorithm=MD5",
+                                  computedChecksum, expectedChecksum);
+                     return Future.succeededFuture(filePath);
+                 });
+    }
+
+    private Future<String> calculateMD5(AsyncFile file)
+    {
+        MessageDigest digest;
+        try
+        {
+            digest = MessageDigest.getInstance("MD5");
+        }
+        catch (NoSuchAlgorithmException e)
+        {
+            return Future.failedFuture(e);
+        }
+
+        Promise<String> result = Promise.promise();
+        file.pause()
+            .setReadBufferSize(64 * 1024)

Review Comment:
   Declare a constant for the buffer size? 



##########
src/main/java/org/apache/cassandra/sidecar/routes/FileStreamHandler.java:
##########
@@ -24,54 +24,60 @@
 import com.google.inject.Inject;
 import io.netty.handler.codec.http.HttpHeaderNames;
 import io.vertx.core.Future;
-import io.vertx.core.Handler;
 import io.vertx.core.file.FileProps;
 import io.vertx.core.file.FileSystem;
 import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
 import io.vertx.ext.web.RoutingContext;
-import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.models.HttpResponse;
 import org.apache.cassandra.sidecar.utils.FileStreamer;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
 
 import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
 import static 
io.netty.handler.codec.http.HttpResponseStatus.REQUESTED_RANGE_NOT_SATISFIABLE;
-import static 
org.apache.cassandra.sidecar.utils.RequestUtils.extractHostAddressWithoutPort;
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
 
 /**
  * Handler for sending out files.
  */
-public class FileStreamHandler implements Handler<RoutingContext>
+public class FileStreamHandler extends AbstractHandler<String>
 {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(FileStreamHandler.class);
     public static final String FILE_PATH_CONTEXT_KEY = "fileToTransfer";
-
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FileStreamHandler.class);

Review Comment:
   logger defined in base class already



##########
common/src/test/java/org/apache/cassandra/sidecar/common/SimpleCassandraVersionProviderTest.java:
##########
@@ -78,7 +78,7 @@ void ensureOutOfOrderInsertionWorks()
                           .add(new V41())
                           .add(new V30()).build();
 
-        ICassandraFactory cassandra = 
provider.getCassandra(SimpleCassandraVersion.create("4.0.0"));
+        ICassandraFactory cassandra = 
provider.cassandra(SimpleCassandraVersion.create("4.0.0"));
         assertThat(cassandra).hasSameClassAs(new V40());

Review Comment:
   To test `ensureOutOfOrderInsertionWorks`, should it get V30 from version 
`SimpleCassandraVersion.create("3.0.0")`?
   
   After I made the change, it actually fails, meaning the insertion order has 
to be in the ascending order of the versions. Either updating the test case or 
the implementation of builder is needed.  



##########
src/main/java/org/apache/cassandra/sidecar/utils/CacheFactory.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import com.github.benmanes.caffeine.cache.Ticker;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Future;
+import org.apache.cassandra.sidecar.Configuration;
+import org.apache.cassandra.sidecar.config.CacheConfiguration;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * A factory for caches used in Sidecar
+ */
+@Singleton
+public class CacheFactory
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CacheFactory.class);
+
+    private final Cache<SSTableImporter.ImportOptions, Future<Void>> 
ssTableImportCache;
+
+    @Inject
+    public CacheFactory(Configuration configuration, SSTableImporter 
ssTableImporter)
+    {
+        this(configuration, ssTableImporter, Ticker.systemTicker());
+    }
+
+    @VisibleForTesting
+    CacheFactory(Configuration configuration, SSTableImporter ssTableImporter, 
Ticker ticker)
+    {
+        this.ssTableImportCache = 
initSSTableImportCache(configuration.ssTableImportCacheConfiguration(),
+                                                         ssTableImporter, 
ticker);
+    }
+
+    /**
+     * @return the cache used for the SSTableImport requests
+     */
+    public Cache<SSTableImporter.ImportOptions, Future<Void>> 
ssTableImportCache()
+    {
+        return ssTableImportCache;
+    }
+
+    /**
+     * Initializes the SSTable Import Cache using the provided {@code 
configuration} and {@code ticker}
+     * for the cache
+     *
+     * @param configuration   the Cache configuration parameters
+     * @param ssTableImporter the reference to the SSTable importer singleton
+     * @param ticker          the ticker for the cache
+     * @return the initialized cache
+     */
+    protected Cache<SSTableImporter.ImportOptions, Future<Void>>
+    initSSTableImportCache(CacheConfiguration configuration, SSTableImporter 
ssTableImporter, Ticker ticker)
+    {
+        Duration expireAfterAccessDuration = 
Duration.of(configuration.expireAfterAccessMillis(), ChronoUnit.MILLIS);
+        long maximumSize = configuration.maximumSize();
+        LOGGER.info("Building SSTable Import Cache with expireAfterAccess={}, 
maxSize={}",
+                    expireAfterAccessDuration, maximumSize);
+        return Caffeine.newBuilder()
+                       .ticker(ticker)
+                       .executor(MoreExecutors.directExecutor())
+                       .expireAfterAccess(expireAfterAccessDuration)
+                       .maximumSize(maximumSize)
+                       .recordStats()
+                       
.removalListener((RemovalListener<SSTableImporter.ImportOptions, Future<Void>>)
+                                        (options, result, cause) -> {
+                                            LOGGER.debug("Removed entry '{}' 
with options '{}' from SSTable Import " +
+                                                         "Cache  and cause 
{}", result, options, cause);

Review Comment:
   ```suggestion
                                                            "Cache and cause 
{}", result, options, cause);
   ```



##########
src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java:
##########
@@ -50,14 +48,9 @@
     List<String> dataDirs();
 
     /**
-     * @return {@link CQLSession} for connecting with instance via CQL
+     * @return staging directory for the uploads of the cassandra instance
      */
-    CQLSession session();
-
-    /**
-     * @return {@link JmxClient} for connecting with the instance via JMX
-     */
-    JmxClient jmxClient();
+    String uploadsStagingDir();

Review Comment:
   Can you call it `stagingDir`? It is just a directory for the staging data, 
not necessary to distinguish between scenarios like upload or others. 



##########
src/main/java/org/apache/cassandra/sidecar/routes/SnapshotsHandler.java:
##########
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.nio.file.NoSuchFileException;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.JsonObject;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.Configuration;
+import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.common.StorageOperations;
+import org.apache.cassandra.sidecar.common.data.ListSnapshotFilesResponse;
+import org.apache.cassandra.sidecar.common.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.data.SnapshotRequest;
+import org.apache.cassandra.sidecar.snapshots.SnapshotDirectory;
+import org.apache.cassandra.sidecar.snapshots.SnapshotPathBuilder;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.cassandraServiceUnavailable;
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * The SnapshotsHandler class handles snapshot operations.
+ *
+ * <ul>
+ *   <li>
+ *      The <b>GET</b> verb will produce a list of paths of all the snapshot 
files of a given
+ *      snapshot name.
+ * <p>
+ *      The query param {@code includeSecondaryIndexFiles} is used to request 
secondary index
+ *      files along with other files. For example:
+ * <p>
+ *      {@code /api/v1/keyspaces/ks/tables/tbl/snapshots/testSnapshot}
+ *      lists all SSTable component files for the <i>"testSnapshot"</i> 
snapshot for the
+ *      <i>"ks"</i> keyspace and the <i>"tbl"</i> table
+ * <p>
+ *      {@code 
/api/v1/keyspaces/ks/tables/tbl/snapshots/testSnapshot?includeSecondaryIndexFiles=true}
+ *      lists all SSTable component files, including secondary index files, 
for the
+ *      <i>"testSnapshot"</i> snapshot for the <i>"ks"</i> keyspace and the 
<i>"tbl"</i> table
+ *   </li>
+ *   <li>
+ *       The <b>PUT</b> verb creates a new snapshot for the given keyspace and 
table
+ *   </li>
+ *   <li>
+ *       The <b>DELETE</b> verb deletes an existing snapshot for the given 
keyspace and table
+ *   </li>
+ * </ul>
+ */
+@Singleton
+public class SnapshotsHandler extends AbstractHandler<SnapshotRequest>
+{
+    private static final String INCLUDE_SECONDARY_INDEX_FILES = 
"includeSecondaryIndexFiles";
+    private final SnapshotPathBuilder builder;
+    private final Configuration configuration;
+
+    @Inject
+    public SnapshotsHandler(SnapshotPathBuilder builder,
+                            Configuration configuration,
+                            InstanceMetadataFetcher metadataFetcher,
+                            CassandraInputValidator validator,
+                            ExecutorPools executorPools)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.builder = builder;
+        this.configuration = configuration;
+    }
+
+    @Override
+    public void handleInternal(RoutingContext context,
+                               HttpServerRequest httpRequest,
+                               String host,
+                               SocketAddress remoteAddress,
+                               SnapshotRequest request)
+    {
+        HttpMethod method = context.request().method();
+        if (method.equals(HttpMethod.GET))
+        {
+            listSnapshot(context, host, remoteAddress, request);
+        }
+        else if (method.equals(HttpMethod.PUT))
+        {
+            createSnapshot(context, host, remoteAddress, request);
+        }
+        else if (method.equals(HttpMethod.DELETE))
+        {
+            clearSnapshot(context, host, remoteAddress, request);
+        }
+        else
+        {
+            throw new UnsupportedOperationException("Method " + 
context.request().method() + " is not supported");
+        }
+    }
+
+    /**
+     * Lists paths of all the snapshot files of a given snapshot name.
+     * <p>
+     * The query param {@code includeSecondaryIndexFiles} is used to request 
secondary index
+     * files along with other files. For example:
+     * <p>
+     * {@code /api/v1/keyspaces/ks/tables/tbl/snapshots/testSnapshot}
+     * lists all SSTable component files for the <i>"testSnapshot"</i> 
snapshot for the
+     * <i>"ks"</i> keyspace and the <i>"tbl"</i> table
+     * <p>
+     * {@code 
/api/v1/keyspaces/ks/tables/tbl/snapshots/testSnapshot?includeSecondaryIndexFiles=true}
+     * lists all SSTable component files, including secondary index files, for 
the
+     * <i>"testSnapshot"</i> snapshot for the <i>"ks"</i> keyspace and the 
<i>"tbl"</i> table
+     *
+     * @param context       the event to handle
+     * @param host          the name of the host
+     * @param remoteAddress the remote address that originated the request
+     * @param requestParams parameters obtained from the request
+     */
+    private void listSnapshot(RoutingContext context,
+                              String host,
+                              SocketAddress remoteAddress,
+                              SnapshotRequest requestParams)
+    {
+        builder.build(host, requestParams)
+               .onSuccess(snapshotDirectory ->
+                          builder.listSnapshotDirectory(snapshotDirectory, 
requestParams.includeSecondaryIndexFiles())
+                                 .onSuccess(fileList -> {
+                                     if (fileList.isEmpty())
+                                     {
+                                         String payload = "Snapshot '" + 
requestParams.snapshotName() + "' not found";
+                                         
context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND, payload));
+                                     }
+                                     else
+                                     {
+                                         logger.debug("SnapshotsHandler 
handled request={}, remoteAddress={}, " +
+                                                      "instance={}", 
requestParams, remoteAddress, host);
+                                         context.json(buildResponse(host, 
snapshotDirectory, fileList));
+                                     }
+                                 })
+                                 .onFailure(cause -> processFailure(cause, 
context, host, remoteAddress, requestParams))
+               )
+               .onFailure(cause -> processFailure(cause, context, host, 
remoteAddress, requestParams));
+    }
+
+    @Override
+    protected void processFailure(Throwable cause,
+                                  RoutingContext context,
+                                  String host,
+                                  SocketAddress remoteAddress,
+                                  SnapshotRequest requestParams)
+    {
+        logger.error("SnapshotsHandler failed for request={}, 
remoteAddress={}, instance={}, method={}",
+                     requestParams, remoteAddress, host, 
context.request().method(), cause);
+        if (cause instanceof FileNotFoundException || cause instanceof 
NoSuchFileException)
+        {
+            context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND, 
cause.getMessage()));
+        }
+        else if (cause instanceof HttpException)
+        {
+            context.fail(cause);
+        }
+        else
+        {
+            context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
"Invalid request for " + requestParams));
+        }
+    }
+
+    private ListSnapshotFilesResponse buildResponse(String host,
+                                                    String snapshotDirectory,
+                                                    
List<SnapshotPathBuilder.SnapshotFile> fileList)
+    {
+        ListSnapshotFilesResponse response = new ListSnapshotFilesResponse();
+        int sidecarPort = configuration.getPort();
+        SnapshotDirectory directory = SnapshotDirectory.of(snapshotDirectory);
+        int dataDirectoryIndex = dataDirectoryIndex(host, 
directory.dataDirectory);
+        int offset = snapshotDirectory.length() + 1;
+
+        for (SnapshotPathBuilder.SnapshotFile snapshotFile : fileList)
+        {
+            int fileNameIndex = snapshotFile.path.indexOf(snapshotDirectory) + 
offset;
+            Preconditions.checkArgument(fileNameIndex < 
snapshotFile.path.length(),
+                                        "Invalid snapshot file '" + 
snapshotFile.path + "'");
+            response.addSnapshotFile(
+            new ListSnapshotFilesResponse.FileInfo(snapshotFile.size,
+                                                   host,
+                                                   sidecarPort,
+                                                   dataDirectoryIndex,
+                                                   directory.snapshotName,
+                                                   directory.keyspace,
+                                                   directory.tableName,
+                                                   
snapshotFile.path.substring(fileNameIndex)));
+        }
+        return response;
+    }
+
+    /**
+     * Creates a new snapshot for the given keyspace and table.
+     *
+     * @param context       the event to handle
+     * @param host          the name of the host
+     * @param remoteAddress the remote address that originated the request
+     * @param requestParams parameters obtained from the request
+     */
+    private void createSnapshot(RoutingContext context,
+                                String host,
+                                SocketAddress remoteAddress,
+                                SnapshotRequest requestParams)
+    {
+        ExecutorPools.TaskExecutorPool pool = executorPools.service();
+        pool.executeBlocking(promise -> {
+                CassandraAdapterDelegate delegate = 
metadataFetcher.delegate(host(context));
+                StorageOperations storageOperations = 
delegate.storageOperations();
+                if (storageOperations == null)
+                    throw cassandraServiceUnavailable();
+                logger.debug("Creating snapshot request={}, remoteAddress={}, 
instance={}",
+                             requestParams, remoteAddress, host);
+                storageOperations.takeSnapshot(requestParams.snapshotName(), 
requestParams.keyspace(),
+                                               requestParams.tableName(), 
ImmutableMap.of());
+                JsonObject jsonObject = new JsonObject()
+                                        .put("result", "Success");
+                context.json(jsonObject);
+            })
+            .onFailure(cause -> processCreateSnapshotFailure(cause, context, 
requestParams, remoteAddress, host));
+    }
+
+    private void processCreateSnapshotFailure(Throwable cause, RoutingContext 
context, SnapshotRequest requestParams,
+                                              SocketAddress remoteAddress, 
String host)
+    {
+        logger.error("SnapshotsHandler failed for request={}, 
remoteAddress={}, instance={}, method={}",
+                     requestParams, remoteAddress, host, 
context.request().method(), cause);
+
+        Throwable rootCause = cause instanceof UndeclaredThrowableException
+                              ? ((UndeclaredThrowableException) 
cause).getUndeclaredThrowable()
+                              : cause;
+
+        if (rootCause instanceof IOException)
+        {
+            if (StringUtils.contains(rootCause.getMessage(),
+                                     "Snapshot " + 
requestParams.snapshotName() + " already exists"))
+            {
+                context.fail(wrapHttpException(HttpResponseStatus.CONFLICT, 
rootCause.getMessage()));
+                return;
+            }
+            else if (StringUtils.contains(rootCause.getMessage(),
+                                          "Cannot snapshot until bootstrap 
completes"))
+            {
+                // Cassandra does not allow taking snapshots while the node is 
JOINING the ring
+                
context.fail(wrapHttpException(HttpResponseStatus.SERVICE_UNAVAILABLE,
+                                               "The Cassandra instance " + 
host + " is not available"));
+            }
+        }
+        else if (rootCause instanceof IllegalArgumentException)
+        {
+            if (StringUtils.contains(rootCause.getMessage(),
+                                     "Keyspace " + requestParams.keyspace() + 
" does not exist") ||
+                StringUtils.contains(rootCause.getMessage(),
+                                     "Unknown keyspace/cf pair"))
+            {
+                context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND, 
rootCause.getMessage()));
+            }
+            else
+            {
+                context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
rootCause.getMessage()));
+            }
+            return;
+        }
+        context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
"Invalid request for " + requestParams));
+    }
+
+    /**
+     * Clears a snapshot for the given keyspace and table. **Note**: 
Currently, Cassandra does not support
+     * the table parameter. We can add support in Cassandra for the additional 
parameter.
+     *
+     * @param context       the event to handle
+     * @param host          the name of the host
+     * @param remoteAddress the remote address that originated the request
+     * @param requestParams parameters obtained from the request
+     */
+    private void clearSnapshot(RoutingContext context,
+                               String host,
+                               SocketAddress remoteAddress,
+                               SnapshotRequest requestParams)
+    {
+        // Leverage the SnapshotBuilder for validation purposes. Currently, 
JMX does not validate for
+        // non-existent snapshot name or keyspace. Additionally, the current 
JMX implementation to clear snapshots
+        // does not support passing a table as a parameter.
+        builder.build(host, requestParams)
+               .compose(snapshotDirectory ->
+                        executorPools.service().executeBlocking(promise -> {
+                            CassandraAdapterDelegate delegate =
+                            metadataFetcher.delegate(host(context));
+                            StorageOperations storageOperations = 
delegate.storageOperations();
+                            if (storageOperations == null)
+                                throw cassandraServiceUnavailable();
+                            logger.debug("Clearing snapshot request={}, 
remoteAddress={}, instance={}",
+                                         requestParams, remoteAddress, host);
+                            
storageOperations.clearSnapshot(requestParams.snapshotName(), 
requestParams.keyspace(),
+                                                            
requestParams.tableName());
+                            context.response().end();
+                        }))
+               .onFailure(cause -> processFailure(cause, context, host, 
remoteAddress, requestParams));
+    }
+
+    private int dataDirectoryIndex(String host, String dataDirectory)
+    {
+        List<String> dataDirs = metadataFetcher.instance(host).dataDirs();
+        for (int index = 0; index < dataDirs.size(); index++)
+        {
+            if (dataDirectory.startsWith(dataDirs.get(index)))
+            {
+                return index;
+            }
+        }
+        return -1;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected SnapshotRequest extractParamsOrThrow(final RoutingContext 
context)
+    {
+        boolean includeSecondaryIndexFiles =
+        
"true".equalsIgnoreCase(context.request().getParam(INCLUDE_SECONDARY_INDEX_FILES,
 "false"));

Review Comment:
   nit
   ```suggestion
           boolean includeSecondaryIndexFiles = 
Boolean.parseBoolean(context.request().getParam(INCLUDE_SECONDARY_INDEX_FILES, 
"false"));
   ```



##########
src/main/java/org/apache/cassandra/sidecar/utils/RequestUtils.java:
##########
@@ -18,30 +18,29 @@
 
 package org.apache.cassandra.sidecar.utils;
 
+import io.vertx.core.http.HttpServerRequest;
+
 /**
  * Utility class for Http request related operations.
  */
 public class RequestUtils
 {
     /**
-     * Given a combined host address like 127.0.0.1:9042 or 
[2001:db8:0:0:0:ff00:42:8329]:9042, this method
-     * removes port information and returns 127.0.0.1 or 
2001:db8:0:0:0:ff00:42:8329.
-     * @param address
-     * @return host address without port information
+     * Parses a boolean parameter from the {@code request}, for the given 
{@code headerName}. If the request param
+     * is not {@code true} or {@code false}, it returns the {@code 
defaultValue}.
+     *
+     * @param request      the request
+     * @param headerName   the name of the header
+     * @param defaultValue the default value when the request parameter does 
not match {@code true} or {@code false}
+     * @return the parsed value for the {@code headerName} from the {@code 
request}
      */
-    public static String extractHostAddressWithoutPort(String address)
+    public static boolean parseBooleanHeader(HttpServerRequest request, String 
headerName, boolean defaultValue)
     {
-        if (address.contains(":"))
-        {
-            // just ipv6 host name present without port information
-            if (address.split(":").length > 2 && !address.startsWith("["))
-            {
-                return address;
-            }
-            String host = address.substring(0, address.lastIndexOf(':'));
-            // remove brackets from ipv6 addresses
-            return host.startsWith("[") ? host.substring(1, host.length() - 1) 
: host;
-        }
-        return address;
+        String value = request.getParam(headerName);
+        if ("true".equalsIgnoreCase(value))
+            return true;
+        if ("false".equalsIgnoreCase(value))
+            return false;
+        return defaultValue;

Review Comment:
   Use `Boolean.parseBoolean()` instead?



##########
src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableCleanupHandler.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes.sstableuploads;
+
+import java.nio.file.NoSuchFileException;
+
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.routes.AbstractHandler;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.SSTableUploadsPathBuilder;
+
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * Manages cleaning up uploaded SSTables
+ */
+public class SSTableCleanupHandler extends AbstractHandler<String>
+{
+    private static final String UPLOAD_ID_PARAM = "uploadId";
+    private final SSTableUploadsPathBuilder uploadPathBuilder;
+
+    /**
+     * Constructs a handler with the provided {@code metadataFetcher}
+     *
+     * @param metadataFetcher   the instance metadata fetcher
+     * @param uploadPathBuilder a class that provides SSTableUploads 
directories
+     * @param executorPools     executor pools for blocking executions
+     */
+    @Inject
+    protected SSTableCleanupHandler(InstanceMetadataFetcher metadataFetcher,
+                                    SSTableUploadsPathBuilder 
uploadPathBuilder,
+                                    ExecutorPools executorPools)
+    {
+        super(metadataFetcher, executorPools, null);
+        this.uploadPathBuilder = uploadPathBuilder;
+    }
+
+    /**
+     * Handles cleaning up the SSTable upload staging directory
+     *
+     * @param context the context for the handler
+     */
+    @Override
+    public void handleInternal(RoutingContext context,
+                               HttpServerRequest httpRequest,
+                               String host,
+                               SocketAddress remoteAddress,
+                               String uploadId)
+    {
+        if (context.request().method() != HttpMethod.DELETE)
+        {
+            context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
"Unknown API requested"));
+            return;
+        }

Review Comment:
   Why would the request router route such request? It is only reregistered for 
`delete`



##########
src/main/java/org/apache/cassandra/sidecar/utils/BaseFileSystem.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.nio.file.NoSuchFileException;
+import java.util.List;
+import java.util.function.Predicate;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.file.FileProps;
+import io.vertx.core.file.FileSystem;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.common.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+
+/**
+ * Provides functionality for filesytem operations

Review Comment:
   ```suggestion
    * Provides functionality for filesystem operations
   ```



##########
vertx-client-shaded/build.gradle:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * The sole purpose of this sub-project is to produce a shaded package for the 
vertx-client. Currently,
+ * the shadow plugin has some limitations when producing a non-shaded and 
shaded versions of the jar.
+ */
+
+import 
com.github.jengelman.gradle.plugins.shadow.transformers.CacheableTransformer
+import com.github.jengelman.gradle.plugins.shadow.transformers.Transformer
+import 
com.github.jengelman.gradle.plugins.shadow.transformers.TransformerContext
+import shadow.org.apache.tools.zip.ZipEntry
+import shadow.org.apache.tools.zip.ZipOutputStream
+
+plugins {
+    id('java-library')
+    id('maven-publish')
+    id('com.github.johnrengelman.shadow') version '6.1.0'
+}
+
+group 'org.apache.cassandra.sidecar'
+version project.version
+
+sourceCompatibility = 1.8
+
+configurations {
+    all*.exclude(group: 'ch.qos.logback')
+}
+
+dependencies {
+    shadow(group: 'org.slf4j', name: 'slf4j-api', version: 
"${project.slf4jVersion}")
+    shadow(group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', 
version: '2.14.2')
+    api(project(':vertx-client'))
+}
+
+// Relocating a Package
+shadowJar {
+    archiveClassifier.set('')
+    relocate 'com.fasterxml.jackson.core', 
'o.a.c.sidecar.client.shaded.com.fasterxml.jackson.core'
+    relocate 'com.fasterxml.jackson.databind', 
'o.a.c.sidecar.client.shaded.com.fasterxml.jackson.databind'
+    relocate 'io.netty', 'o.a.c.sidecar.client.shaded.io.netty'
+    relocate 'io.vertx', 'o.a.c.sidecar.client.shaded.io.vertx'
+    relocate 'META-INF/native/libnetty', 
'META-INF/native/sidecar_client_netty_shaded_netty'
+    relocate 'META-INF/native/netty', 
'META-INF/native/io_sidecar_client_netty_shaded_netty'
+    relocate 'META-INF/versions/11/io/vertx', 
'META-INF/versions/11/o/a/c/sidecar/client/shaded/io/vertx'
+    transform(NettyResourceTransformer.class)
+    mergeServiceFiles()
+
+    dependencies {
+        exclude(dependency('org.slf4j:.*:.*'))
+        // Exclude the jackson-annotations dependency so that extension 
clients can keep annotating POJOs
+        // without the need to use shadow annotations. For example, they can 
continue using
+        // com.fasterxml.jackson.annotation.JsonProperty.
+        
exclude(dependency('com.fasterxml.jackson.core:jackson-annotations:.*'))
+    }
+}
+
+publishing {
+    publications {
+        shadow(MavenPublication) { publication ->
+            project.shadow.component(publication)
+
+            groupId project.group
+            artifactId "vertx-client-all"
+            version System.getenv("CODE_VERSION") ?: "${version}"
+        }
+    }
+}
+
+/**
+ * A Transformer which updates the Netty JAR META-INF/ resources to accurately
+ * reference shaded class names.
+ */
+@CacheableTransformer
+class NettyResourceTransformer implements Transformer {

Review Comment:
   MAGIC!



##########
src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java:
##########
@@ -0,0 +1,587 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.Configuration;
+import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.common.TableOperations;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * This class is in charge of performing SSTable imports into the desired 
Cassandra instance.
+ * Since imports are synchronized in the Cassandra side on a per table-basis, 
we only perform one import per
+ * Cassandra instance's keyspace/table, and we queue the rest of the import 
requests.
+ */
+@Singleton
+public class SSTableImporter
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SSTableImporter.class);
+    public static final boolean DEFAULT_RESET_LEVEL = true;
+    public static final boolean DEFAULT_CLEAR_REPAIRED = true;
+    public static final boolean DEFAULT_VERIFY_SSTABLES = true;
+    public static final boolean DEFAULT_VERIFY_TOKENS = true;
+    public static final boolean DEFAULT_INVALIDATE_CACHES = true;
+    public static final boolean DEFAULT_EXTENDED_VERIFY = true;
+    public static final boolean DEFAULT_COPY_DATA = false;
+    private final Vertx vertx;
+    private final ExecutorPools executorPools;
+    private final InstanceMetadataFetcher metadataFetcher;
+    private final SSTableUploadsPathBuilder uploadPathBuilder;
+    @VisibleForTesting
+    final Map<String, ImportQueue> importQueuePerHost;
+
+    /**
+     * Constructs a new instance of the SSTableImporter class
+     *
+     * @param vertx             the vertx instance
+     * @param metadataFetcher   a class for fetching InstanceMetadata
+     * @param configuration     the configuration for Sidecar
+     * @param executorPools     the executor pool
+     * @param uploadPathBuilder a class that provides SSTableUploads 
directories
+     */
+    @Inject
+    SSTableImporter(Vertx vertx,
+                    InstanceMetadataFetcher metadataFetcher,
+                    Configuration configuration,
+                    ExecutorPools executorPools,
+                    SSTableUploadsPathBuilder uploadPathBuilder)
+    {
+        this.vertx = vertx;
+        this.executorPools = executorPools;
+        this.metadataFetcher = metadataFetcher;
+        this.uploadPathBuilder = uploadPathBuilder;
+        this.importQueuePerHost = new HashMap<>();
+        executorPools.internal()
+                     
.setPeriodic(configuration.getSSTableImportPollIntervalMillis(), 
this::processPendingImports);
+    }
+
+    /**
+     * Queues an import with the provided import {@code options} to be 
processed asynchronously. The imports
+     * are queued in a FIFO queue.
+     *
+     * @param options import options
+     * @return a future for the result of the import
+     */
+    public Future<Void> scheduleImport(ImportOptions options)
+    {
+        Promise<Void> promise = Promise.promise();
+        importQueuePerHost.computeIfAbsent(key(options), this::initializeQueue)
+                          .offer(Pair.of(promise, options));
+        return promise.future();
+    }
+
+    /**
+     * Attempts to cancel an import for the provided {@code options}. This is 
a best-effort attempt, and
+     * if the import has been started, it will not be cancelled.
+     *
+     * @param options import options
+     * @return true if the options were removed from the queue, false otherwise
+     */
+    public boolean cancelImport(ImportOptions options)
+    {
+        ImportQueue queue = importQueuePerHost.get(key(options));
+        boolean removed = false;
+        if (queue != null)
+        {
+            removed = queue.removeIf(tuple -> 
options.equals(tuple.getRight()));
+        }
+
+        LOGGER.debug("Cancel import for options={} was {}removed", options, 
removed ? "" : "not ");
+        return removed;
+    }
+
+    /**
+     * Returns a key for the queues for the given {@code options}. Classes 
extending from {@link SSTableImporter}
+     * can override the {@link #key(ImportOptions)} method and provide a 
different key for the queue.
+     *
+     * @param options the import options
+     * @return a key for the queues for the given {@code options}
+     */
+    protected String key(ImportOptions options)
+    {
+        return options.host + "$" + options.keyspace + "$" + options.tableName;
+    }
+
+    /**
+     * Returns a new queue for the given {@code key}. Classes extending from 
{@link SSTableImporter} can override
+     * this method and provide a different implementation for the queue.
+     *
+     * @param key the key for the map
+     * @return a new queue for the given {@code key}
+     */
+    protected ImportQueue initializeQueue(String key)
+    {
+        return new ImportQueue();
+    }
+
+    /**
+     * Processes pending imports for every host in the import queue.
+     *
+     * @param timerId a unique identifier for the periodic timer
+     */
+    private void processPendingImports(Long timerId)
+    {
+        for (ImportQueue queue : importQueuePerHost.values())
+        {
+            if (!queue.isEmpty())
+            {
+                executorPools.internal()
+                             .executeBlocking(p -> 
maybeDrainImportQueue(queue));
+            }
+        }
+    }
+
+    /**
+     * Tries to lock the queue to perform the draining. If the queue is 
already being drained, then it will
+     * not perform any operation.
+     *
+     * @param queue a queue of import tasks
+     */
+    private void maybeDrainImportQueue(ImportQueue queue)
+    {
+        if (queue.tryLock())
+        {
+            try
+            {
+                drainImportQueue(queue);
+            }
+            finally
+            {
+                queue.unlock();
+            }
+        }
+    }
+
+    /**
+     * This blocking operation will drain the {@code queue}. It will utilize a 
single thread
+     * to import the pending import requests on that host.
+     *
+     * @param queue a queue of import tasks
+     */
+    private void drainImportQueue(ImportQueue queue)
+    {
+        while (!queue.isEmpty())
+        {
+            Pair<Promise<Void>, ImportOptions> pair = queue.poll();
+            Promise<Void> promise = pair.getLeft();
+            ImportOptions options = pair.getRight();
+
+            CassandraAdapterDelegate cassandra = 
metadataFetcher.delegate(options.host);
+            TableOperations tableOperations = cassandra.tableOperations();
+
+            if (tableOperations == null)
+            {
+                promise.fail(new 
HttpException(HttpResponseStatus.SERVICE_UNAVAILABLE.code(),
+                                               "Cassandra service is 
unavailable"));
+            }
+            else
+            {
+                try
+                {
+                    List<String> failedDirectories =
+                    tableOperations.importNewSSTables(options.keyspace,
+                                                      options.tableName,
+                                                      options.directory,
+                                                      options.resetLevel,
+                                                      options.clearRepaired,
+                                                      options.verifySSTables,
+                                                      options.verifyTokens,
+                                                      options.invalidateCaches,
+                                                      options.extendedVerify,
+                                                      options.copyData);
+                    if (!failedDirectories.isEmpty())
+                    {
+                        promise.fail(new 
HttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(),
+                                                       "Failed to import from 
directories: " + failedDirectories));
+                    }
+                    else
+                    {
+                        promise.complete();
+                        cleanup(options);
+                    }
+                }
+                catch (Exception exception)
+                {
+                    LOGGER.error("Failed to import SSTables with options={}", 
options, exception);
+                    promise.fail(exception);
+                }
+            }
+        }
+    }
+
+    /**
+     * Removes the staging directory recursively after a successful import
+     *
+     * @param options import options
+     */
+    private void cleanup(ImportOptions options)
+    {
+        uploadPathBuilder.resolveStagingDirectory(options.host, 
options.uploadId)
+                         .compose(uploadPathBuilder::isValidDirectory)
+                         .compose(stagingDirectory -> vertx.fileSystem()
+                                                           
.deleteRecursive(stagingDirectory, true))
+                         .onSuccess(v ->
+                                    LOGGER.debug("Successfully removed staging 
directory for uploadId={}, " +
+                                                 "instance={}, options={}", 
options.uploadId, options.host, options))
+                         .onFailure(cause ->
+                                    LOGGER.error("Failed to remove staging 
directory for uploadId={}, " +
+                                                 "instance={}, options={}", 
options.uploadId, options.host, options,
+                                                 cause));
+    }
+
+    /**
+     * A {@link ConcurrentLinkedQueue} that allows for locking the queue while 
operating on it. The queue
+     * must be unlocked once the operations are complete.
+     */
+    static class ImportQueue extends ConcurrentLinkedQueue<Pair<Promise<Void>, 
ImportOptions>>
+    {
+        private final AtomicBoolean isQueueInUse = new AtomicBoolean(false);
+
+        /**
+         * @return true if the queue was successfully locked, false otherwise
+         */
+        public boolean tryLock()
+        {
+            return isQueueInUse.compareAndSet(false, true);
+        }
+
+        /**
+         * Unlocks the queue
+         */
+        public void unlock()
+        {
+            isQueueInUse.set(false);
+        }
+
+        public boolean removeIf(Predicate<? super Pair<Promise<Void>, 
ImportOptions>> filter)
+        {
+            return super.removeIf(filter);
+        }

Review Comment:
   Remove it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to