yifan-c commented on code in PR #158:
URL: https://github.com/apache/cassandra-sidecar/pull/158#discussion_r1881484259
##########
client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java:
##########
@@ -489,6 +492,45 @@ public CompletableFuture<Void>
cleanUploadSession(SidecarInstance instance, Stri
.build());
}
+ /**
+ * Lists CDC commit logs in CDC directory for an instance
+ * @param sidecarInstance instance on which the CDC commit logs are to be
listed
+ * @return a completable future with List of cdc commitLogs on the
requested instance
+ */
+ public CompletableFuture<ListCdcSegmentsResponse>
listCdcSegments(SidecarInstance sidecarInstance)
+ {
+ return executor.executeRequestAsync(requestBuilder()
+ .singleInstanceSelectionPolicy(sidecarInstance)
+ .request(new ListCommitLogsRequest())
+ .build());
+ }
+
+ /**
+ * Streams CDC commit log segments from the requested instance.
Review Comment:
Remove the extra space
```suggestion
* Streams CDC commit log segments from the requested instance.
```
##########
client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java:
##########
@@ -489,6 +492,45 @@ public CompletableFuture<Void>
cleanUploadSession(SidecarInstance instance, Stri
.build());
}
+ /**
+ * Lists CDC commit logs in CDC directory for an instance
+ * @param sidecarInstance instance on which the CDC commit logs are to be
listed
+ * @return a completable future with List of cdc commitLogs on the
requested instance
+ */
+ public CompletableFuture<ListCdcSegmentsResponse>
listCdcSegments(SidecarInstance sidecarInstance)
+ {
+ return executor.executeRequestAsync(requestBuilder()
+ .singleInstanceSelectionPolicy(sidecarInstance)
+ .request(new ListCommitLogsRequest())
+ .build());
+ }
+
+ /**
+ * Streams CDC commit log segments from the requested instance.
+ *
+ * Streams the specified {@code range} of a CDC CommitLog from the given
instance and the
+ * stream is consumed by the {@link StreamConsumer consumer}.
+ *
+ * @param sidecarInstance instance on which the CDC commit logs are to be
streamed
+ * @param segment segment file name
+ * @param range range of the file to be streamed
+ * @param partitionId partition ID
+ * @param batchId batchId
+ * @param streamConsumer object that consumes the stream
+ */
+ public void streamCommitLogs(SidecarInstance sidecarInstance,
+ String segment,
+ HttpRange range,
+ String partitionId,
+ String batchId,
+ StreamConsumer streamConsumer)
+ {
+ executor.streamRequest(requestBuilder()
+ .singleInstanceSelectionPolicy(sidecarInstance)
+ .request(new StreamCdcSegmentRequest(segment, range,
partitionId, batchId))
+ .build(), streamConsumer);
+ }
Review Comment:
```suggestion
public void streamCommitLogs(SidecarInstance sidecarInstance,
String segment,
HttpRange range,
StreamConsumer streamConsumer)
{
executor.streamRequest(requestBuilder()
.singleInstanceSelectionPolicy(sidecarInstance)
.request(new StreamCdcSegmentRequest(segment,
range))
.build(), streamConsumer);
}
```
##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/request/StreamCdcSegmentRequest.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.sidecar.common.request;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.utils.HttpRange;
+
+/**
+ * Represents a request to stream CDC segments(commit logs) on an instance.
+ */
+public class StreamCdcSegmentRequest extends Request
+{
+ private final String segment;
Review Comment:
this field is unused and unnecessary, since `segment` is used only to build
the request URI.
##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ListCommitLogsRequest.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.sidecar.common.request;
+
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.response.ListCdcSegmentsResponse;
+
+/**
+ * Represents a request for listing commit log files on an instance
+ */
+public class ListCommitLogsRequest extends JsonRequest<ListCdcSegmentsResponse>
Review Comment:
Please rename the class so it is consistent with the terminology in
Cassandra.
Each individual `.log` file is a log segment. The collection of all `.log`
files or segments is commit log.
Given the segments in question are for CDC, the class could be named as
`ListCdcSegmentsRequest`.
The response class is named as `ListCdcSegmentsResponse` already.
##########
client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java:
##########
@@ -489,6 +492,45 @@ public CompletableFuture<Void>
cleanUploadSession(SidecarInstance instance, Stri
.build());
}
+ /**
+ * Lists CDC commit logs in CDC directory for an instance
+ * @param sidecarInstance instance on which the CDC commit logs are to be
listed
+ * @return a completable future with List of cdc commitLogs on the
requested instance
+ */
+ public CompletableFuture<ListCdcSegmentsResponse>
listCdcSegments(SidecarInstance sidecarInstance)
+ {
+ return executor.executeRequestAsync(requestBuilder()
+ .singleInstanceSelectionPolicy(sidecarInstance)
+ .request(new ListCommitLogsRequest())
+ .build());
+ }
+
+ /**
+ * Streams CDC commit log segments from the requested instance.
+ *
+ * Streams the specified {@code range} of a CDC CommitLog from the given
instance and the
+ * stream is consumed by the {@link StreamConsumer consumer}.
+ *
+ * @param sidecarInstance instance on which the CDC commit logs are to be
streamed
+ * @param segment segment file name
+ * @param range range of the file to be streamed
+ * @param partitionId partition ID
+ * @param batchId batchId
+ * @param streamConsumer object that consumes the stream
+ */
+ public void streamCommitLogs(SidecarInstance sidecarInstance,
+ String segment,
+ HttpRange range,
+ String partitionId,
+ String batchId,
Review Comment:
Both `partitionId` and `batchId` are not needed. Let's remove them.
##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListCdcSegmentsResponse.java:
##########
@@ -65,4 +66,19 @@ public List<CdcSegmentInfo> segmentInfos()
{
return segmentInfos;
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
Review Comment:
Unfortunately, requiring braces for single line if-else body is not declared
in the check style ruleset. I believe most of the if-else statements do have
braces for the single line body. Please stay consistent. We can add the check
style rule to enforce it.
##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/request/StreamCdcSegmentRequest.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.sidecar.common.request;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.utils.HttpRange;
+
+/**
+ * Represents a request to stream CDC segments(commit logs) on an instance.
+ */
+public class StreamCdcSegmentRequest extends Request
+{
+ private final String segment;
+ private final HttpRange range;
+ private final String batchId;
+ private final String partitionId;
+
+ public StreamCdcSegmentRequest(String segment, HttpRange range, String
batchId, String partitionId)
+ {
+ super(requestURI(segment));
+ this.segment = segment;
+ this.range = range;
+ this.batchId = batchId;
+ this.partitionId = partitionId;
+ }
+
+ @Override
+ public HttpMethod method()
+ {
+ return HttpMethod.GET;
+ }
+
+ @Override
+ public Map<String, String> headers()
+ {
+ Map<String, String> headers = new HashMap<>(super.headers());
+ headers.put("X-Stream-Batch", batchId);
Review Comment:
yeah. both `X-Stream-Batch` and `X-Stream-Partition` are not used.
##########
client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java:
##########
@@ -489,6 +492,45 @@ public CompletableFuture<Void>
cleanUploadSession(SidecarInstance instance, Stri
.build());
}
+ /**
+ * Lists CDC commit logs in CDC directory for an instance
+ * @param sidecarInstance instance on which the CDC commit logs are to be
listed
+ * @return a completable future with List of cdc commitLogs on the
requested instance
+ */
+ public CompletableFuture<ListCdcSegmentsResponse>
listCdcSegments(SidecarInstance sidecarInstance)
+ {
+ return executor.executeRequestAsync(requestBuilder()
+ .singleInstanceSelectionPolicy(sidecarInstance)
+ .request(new ListCommitLogsRequest())
+ .build());
Review Comment:
align the chained invocations
```suggestion
return executor.executeRequestAsync(requestBuilder()
.singleInstanceSelectionPolicy(sidecarInstance)
.request(new
ListCommitLogsRequest())
.build());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]