yifan-c commented on code in PR #91:
URL: https://github.com/apache/cassandra-sidecar/pull/91#discussion_r1569647561
##########
src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandler.java:
##########
@@ -61,32 +72,93 @@ public void handleInternal(RoutingContext context,
SocketAddress remoteAddress,
StreamSSTableComponentRequest request)
{
- snapshotPathBuilder.build(host, request)
- .onSuccess(path -> {
- logger.debug("StreamSSTableComponentHandler
handled {} for client {}. "
- + "Instance: {}", path,
remoteAddress, host);
-
context.put(FileStreamHandler.FILE_PATH_CONTEXT_KEY, path)
- .next();
- })
- .onFailure(cause -> {
- String errMsg =
- "StreamSSTableComponentHandler failed for
request: {} from: {}. Instance: {}";
- logger.error(errMsg, request, remoteAddress,
host, cause);
- if (cause instanceof NoSuchFileException)
- {
-
context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND,
cause.getMessage()));
- }
- else
- {
-
context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST,
- "Invalid
request for " + request));
- }
- });
+ Future<String> componentPathFuture;
+ if (shouldUseLegacyStreamSStableComponent(request))
+ {
+ LOGGER.warn("Streaming SSTable component without a data directory
index. " +
+ "request={}, remoteAddress={}, instance={}", request,
remoteAddress, host);
+ componentPathFuture = snapshotPathBuilder.build(host, request);
+ }
+ else
+ {
+ componentPathFuture = resolveComponentPathFromRequest(host,
request);
+ }
+
+ componentPathFuture.onSuccess(path -> {
+ logger.debug("{} resolved. path={}, request={}, remoteAddress={},
instance={}",
+ this.getClass().getSimpleName(), path, request,
remoteAddress, host);
+ context.put(FileStreamHandler.FILE_PATH_CONTEXT_KEY, path).next();
+ }).onFailure(cause -> processFailure(cause, context, host,
remoteAddress, request));
+ }
+
+ private Future<String> resolveComponentPathFromRequest(String host,
StreamSSTableComponentRequest request)
+ {
+ return executorPools.internal().executeBlocking(promise -> {
+ CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
+ if (delegate == null)
+ {
+ promise.fail(cassandraServiceUnavailable());
+ return;
+ }
+
+ StorageOperations storageOperations = delegate.storageOperations();
+ if (storageOperations == null)
+ {
+ promise.fail(cassandraServiceUnavailable());
+ return;
+ }
+
+ List<String> dataDirList = storageOperations.dataFileLocations();
+ int dataDirIndex = request.dataDirectoryIndex();
+ if (dataDirIndex < 0 || dataDirIndex >= dataDirList.size())
+ {
+ promise.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST,
+ "Invalid data directory index="
+ dataDirIndex));
+ return;
+ }
+
+ String path =
snapshotPathBuilder.resolveComponentPath(dataDirList.get(dataDirIndex),
request);
+
+ promise.complete(path);
+ });
+ }
Review Comment:
Let's not introduce new use cases of the deprecated API. Execute with a
callable simplify the code a little bit too. We also need to check the return
value of `dataDirectoryIndex` is not null, as compiler cannot infer from the
callsites.
```suggestion
private Future<String> resolveComponentPathFromRequest(String host,
StreamSSTableComponentRequest request)
{
return executorPools.internal().executeBlocking(() -> {
CassandraAdapterDelegate delegate =
metadataFetcher.delegate(host);
StorageOperations storageOperations = delegate == null ? null :
delegate.storageOperations();
if (storageOperations == null)
{
throw cassandraServiceUnavailable();
}
List<String> dataDirList = storageOperations.dataFileLocations();
Integer dataDirIndex = request.dataDirectoryIndex();
if (dataDirIndex == null || dataDirIndex < 0 || dataDirIndex >=
dataDirList.size())
{
throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
"dataDirectoryIndex out of boundary.
index=" + dataDirIndex);
}
return
snapshotPathBuilder.resolveComponentPath(dataDirList.get(dataDirIndex),
request);
});
}
```
##########
common/src/main/java/org/apache/cassandra/sidecar/common/data/ListSnapshotFilesResponse.java:
##########
@@ -73,6 +74,7 @@ public FileInfo(@JsonProperty("size") long size,
@JsonProperty("snapshotName") String snapshotName,
@JsonProperty("keySpaceName") String keySpaceName,
@JsonProperty("tableName") String tableName,
+ @JsonProperty("tableUid") String tableUid,
Review Comment:
How about naming it `tableId`? just the same as Cassandra
##########
src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandler.java:
##########
@@ -42,6 +52,7 @@
@Singleton
public class StreamSSTableComponentHandler extends
AbstractHandler<StreamSSTableComponentRequest>
{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StreamSSTableComponentHandler.class);
Review Comment:
`AbstractHandler` defines `logger` already. I would prefer to this change
though. But for this patch, I think we should use `logger`.
##########
src/main/java/org/apache/cassandra/sidecar/data/StreamSSTableComponentRequest.java:
##########
@@ -112,6 +127,36 @@ public String toString()
", snapshot='" + snapshotName + '\'' +
", secondaryIndexName='" + secondaryIndexName() + '\'' +
", componentName='" + componentName() + '\'' +
+ ", dataDirectoryIndex='" + dataDirectoryIndex + '\'' +
'}';
}
+
+ public static StreamSSTableComponentRequest from(QualifiedTableName
qualifiedTableName, RoutingContext context)
+ {
+ String snapshotName = context.pathParam("snapshot");
+ String secondaryIndexName = context.pathParam("index");
+ String componentName = context.pathParam("component");
+ String tableUuid = maybeGetTableUuid(context.pathParam("table"));
+ Integer dataDirectoryIndex =
RequestUtils.parseIntegerQueryParam(context.request(), "dataDirectory", null);
+
+ return new StreamSSTableComponentRequest(qualifiedTableName,
+ snapshotName,
+ secondaryIndexName,
+ componentName,
+ tableUuid,
+ dataDirectoryIndex);
+ }
+
+ static String maybeGetTableUuid(String table)
+ {
+ if (table != null)
+ {
+ int index = table.indexOf("-");
Review Comment:
add a comment here.
```
// Cassandra disallow having '-' as part of the table name, even if the
table name is quoted.
// If the string contains '-', it follows by the tableId.
// See
https://github.com/apache/cassandra/blob/c33c8ebab444209a9675f273448110afd0787faa/src/java/org/apache/cassandra/schema/TableId.java#L88
```
##########
src/main/java/org/apache/cassandra/sidecar/data/StreamSSTableComponentRequest.java:
##########
@@ -112,6 +127,36 @@ public String toString()
", snapshot='" + snapshotName + '\'' +
", secondaryIndexName='" + secondaryIndexName() + '\'' +
", componentName='" + componentName() + '\'' +
+ ", dataDirectoryIndex='" + dataDirectoryIndex + '\'' +
'}';
}
+
+ public static StreamSSTableComponentRequest from(QualifiedTableName
qualifiedTableName, RoutingContext context)
+ {
+ String snapshotName = context.pathParam("snapshot");
+ String secondaryIndexName = context.pathParam("index");
+ String componentName = context.pathParam("component");
+ String tableUuid = maybeGetTableUuid(context.pathParam("table"));
+ Integer dataDirectoryIndex =
RequestUtils.parseIntegerQueryParam(context.request(), "dataDirectory", null);
Review Comment:
Should the param name be "dataDirectoryIndex"?
##########
src/main/java/org/apache/cassandra/sidecar/routes/FileStreamHandler.java:
##########
@@ -77,40 +81,57 @@ protected String extractParamsOrThrow(RoutingContext
context)
return context.get(FILE_PATH_CONTEXT_KEY);
}
- /**
- * Ensures that the file exists and is a non-empty regular file
- *
- * @param fs The underlying filesystem
- * @param localFile The path the file in the filesystem
- * @param exists Whether the file exists or not
- * @return a succeeded future with the {@link FileProps}, or a failed
future if the file does not exist;
- * is not a regular file; or if the file is empty
- */
- private Future<FileProps> ensureValidFile(FileSystem fs, String localFile,
Boolean exists)
+ @Override
+ protected void processFailure(Throwable cause,
+ RoutingContext context,
+ String host,
+ SocketAddress remoteAddress,
+ String localFile)
{
- if (!exists)
+ IOException fileNotFoundException = ThrowableUtils.getCause(cause,
NoSuchFileException.class);
+
+ if (fileNotFoundException == null)
+ {
+ // FileNotFoundException comes from the stream method
+ fileNotFoundException = ThrowableUtils.getCause(cause,
FileNotFoundException.class);
+ }
+
+ if (fileNotFoundException != null)
{
logger.error("The requested file '{}' does not exist", localFile);
- return Future.failedFuture(wrapHttpException(NOT_FOUND, "The
requested file does not exist"));
+ context.fail(wrapHttpException(NOT_FOUND, "The requested file does
not exist"));
+ return;
+ }
+
+ super.processFailure(cause, context, host, remoteAddress, localFile);
+ }
+
+ protected Future<Long> fileSize(RoutingContext context, String path)
+ {
+ Long fileSize = RequestUtils.parseLongQueryParam(context.request(),
"size", null);
+ if (fileSize != null)
+ {
+ return Future.succeededFuture(fileSize);
Review Comment:
Discussed offline and agreed on removing client supplied file size.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]