yifan-c commented on code in PR #147:
URL: https://github.com/apache/cassandra-sidecar/pull/147#discussion_r1841169239


##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListCDCSegmentResponse.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import java.util.List;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A class representing a response for a list CDC Segment request
+ */
+public class ListCDCSegmentResponse

Review Comment:
   nit: add `@JsonIgnoreProperties(ignoreUnknown = true)` to provide better 
compatibility. Similar for the other json pojos. 



##########
server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/ListCDCDirHandler.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.Json;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.common.response.ListCDCSegmentResponse;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.routes.AbstractHandler;
+import org.apache.cassandra.sidecar.utils.CDCUtil;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static org.apache.cassandra.sidecar.utils.CDCUtil.getIdxFileName;
+import static org.apache.cassandra.sidecar.utils.CDCUtil.getLogFilePrefix;
+import static org.apache.cassandra.sidecar.utils.CDCUtil.isIndexFile;
+import static org.apache.cassandra.sidecar.utils.CDCUtil.parseIndexFile;
+
+/**
+ * Provides REST endpoint for listing commit logs in CDC directory.
+ */
+@Singleton
+public class ListCDCDirHandler extends AbstractHandler<Void>
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ListCDCDirHandler.class);
+    private final ServiceConfiguration config;
+
+    @Inject
+    public ListCDCDirHandler(InstanceMetadataFetcher metadataFetcher,
+                             SidecarConfiguration config,
+                             ExecutorPools executorPools,
+                             CassandraInputValidator validator)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.config = config.serviceConfiguration();
+    }
+
+    @Override
+    protected void handleInternal(RoutingContext context,
+                                  HttpServerRequest httpRequest,
+                                  String host,
+                                  SocketAddress remoteAddress,
+                                  Void request)
+    {
+        String cdcDir = metadataFetcher.instance(host).cdcDir();
+        ListCDCSegmentResponse listCDCSegmentResponse = new 
ListCDCSegmentResponse(config.host(),
+                                                                               
    config.port(),
+                                                                               
    new LinkedList<>());
+        try
+        {
+            addResources(cdcDir, listCDCSegmentResponse);

Review Comment:
   probably suitable to dispatch to the internal executorPool run 
asynchronously. 



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CDCLogCache.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.utils.CDCUtil;
+
+/**
+ * CDCLogCache caches the recently downloaded files to avoid being deleted by 
accident.
+ * <br>
+ * Downloads tracking is via {@linkplain #touch}.
+ * <br>
+ * In the event of deleting the _consumed_ files, 1 supersedes 2, meaning the 
_consumed_ files and their links
+ * are deleted, even though within the cache duration.
+ */
+@Singleton
+public class CDCLogCache
+{
+    public static final String TEMP_DIR_SUFFIX = "_tmp";
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CDCLogCache.class);
+    private static final RemovalListener<File, File> hardlinkRemover = 
notification -> deleteFileIfExist(notification.getValue());
+
+    // todo: move to yaml when OSS
+    private static final int CACHE_EXPIRY_IN_SECS = 
Integer.getInteger("SegmentHardlinkCacheExpiryInSecs", 300);

Review Comment:
   plz address this todo.



##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListCDCSegmentResponse.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import java.util.List;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A class representing a response for a list CDC Segment request
+ */
+public class ListCDCSegmentResponse
+{
+    public final String host;
+    public final int port;
+    private List<SegmentInfo> segmentsInfo;
+
+    public ListCDCSegmentResponse(@JsonProperty("host") String host, 
@JsonProperty("port") int port,
+                                  @JsonProperty("segmentInfo") 
List<SegmentInfo> segmentsInfo)
+    {
+        this.host = host;
+        this.port = port;
+        this.segmentsInfo = segmentsInfo;
+    }
+
+    public String host()
+    {
+        return host;
+    }
+
+    public int port()
+    {
+        return port;
+    }
+
+    public List<SegmentInfo> getSegmentsInfo()
+    {
+        return segmentsInfo;
+    }
+
+    public void addSegmentInfo(SegmentInfo segmentInfo)
+    {
+        this.segmentsInfo.add(segmentInfo);
+    }
+
+    public void addSegmentsInfo(List<SegmentInfo> segmentInfos)
+    {
+        this.segmentsInfo.addAll(segmentInfos);
+    }

Review Comment:
   unused. please remove



##########
server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/StreamCDCSegment.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.vertx.core.Promise;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.cdc.CDCLogCache;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.exceptions.RangeException;
+import org.apache.cassandra.sidecar.common.utils.HttpRange;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.models.HttpResponse;
+import org.apache.cassandra.sidecar.routes.AbstractHandler;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.FileStreamer;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import static org.apache.cassandra.sidecar.utils.CDCUtil.getIdxFileName;
+import static org.apache.cassandra.sidecar.utils.CDCUtil.isLogFile;
+import static org.apache.cassandra.sidecar.utils.CDCUtil.isValid;
+import static org.apache.cassandra.sidecar.utils.CDCUtil.parseIndexFile;
+
+/**
+ * Provides REST endpoint for streaming cdc commit logs.
+ */
+@Singleton
+public class StreamCDCSegment extends AbstractHandler<String>
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamCDCSegment.class);
+
+    private final FileStreamer fileStreamer;
+    private final CDCLogCache cdcLogCache;
+
+    @Inject
+    public StreamCDCSegment(final InstanceMetadataFetcher metadataFetcher,
+                            final FileStreamer fileStreamer,
+                            final CDCLogCache cdcLogCache,
+                            final ExecutorPools executorPools,
+                            final CassandraInputValidator validator)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.fileStreamer = fileStreamer;
+        this.cdcLogCache = cdcLogCache;
+    }
+
+    @Override
+    protected void handleInternal(RoutingContext context,
+                                  HttpServerRequest httpRequest,
+                                  String host,
+                                  SocketAddress remoteAddress,
+                                  String segment)
+    {
+        final InstanceMetadata instance = metadataFetcher.instance(host);
+        final String cdcDir = instance.cdcDir();
+        final String idxFileName = getIdxFileName(segment);
+        final File segmentFile = new File(cdcDir, segment);
+        final File idxFile = new File(cdcDir, idxFileName);
+        Promise<Void> promise = Promise.promise();
+        streamSegment(context, segmentFile, idxFile, promise);
+        // Touch the files at the end of the request
+        // If the file exists in cache, its expiry is extended; otherwise, the 
cache is not changed.
+        promise.future().onSuccess(res -> cdcLogCache.touch(segmentFile, 
idxFile));
+    }
+
+    /**
+     * Validate file existence and hardlink the cdc commit log file and the 
index file.
+     */
+    private void streamSegment(RoutingContext ctx,
+                               File segmentFile,
+                               File indexFile,
+                               Promise<Void> promise)
+    {
+        final HttpResponse response = new HttpResponse(ctx.request(), 
ctx.response());

Review Comment:
   Remove the unnecessary `final` modifiers. Generally only declare `final` for 
instance variables. Please double check the usage in the other files too. 



##########
server/src/test/java/org/apache/cassandra/sidecar/cdc/CDCLogCacheTest.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import com.google.common.base.Preconditions;
+
+import org.junit.jupiter.api.Test;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.ExecutorPoolsHelper;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+
+import static org.mockito.Mockito.mock;
+
+class CDCLogCacheTest
+{
+    Vertx vertx = Vertx.vertx();
+
+    /**
+     * Failing to clean up shouldn't fail to initialize the class
+     */
+    @Test
+    void testCleanupErrorDoesntPreventInitialization()
+    {
+        new 
FailingCDCLogCache(ExecutorPoolsHelper.createdSharedTestPool(vertx),
+                mock(InstancesConfig.class));
+    }

Review Comment:
   Can you add more test cases? For example, hard link creation and cleanup, 
cache expiry, etc.



##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListCDCSegmentResponse.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import java.util.List;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A class representing a response for a list CDC Segment request
+ */
+public class ListCDCSegmentResponse
+{
+    public final String host;
+    public final int port;
+    private List<SegmentInfo> segmentsInfo;
+
+    public ListCDCSegmentResponse(@JsonProperty("host") String host, 
@JsonProperty("port") int port,
+                                  @JsonProperty("segmentInfo") 
List<SegmentInfo> segmentsInfo)

Review Comment:
   nit: new line per parameter.



##########
server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/StreamCDCSegment.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.vertx.core.Promise;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.cdc.CDCLogCache;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.exceptions.RangeException;
+import org.apache.cassandra.sidecar.common.utils.HttpRange;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.models.HttpResponse;
+import org.apache.cassandra.sidecar.routes.AbstractHandler;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.FileStreamer;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import static org.apache.cassandra.sidecar.utils.CDCUtil.getIdxFileName;
+import static org.apache.cassandra.sidecar.utils.CDCUtil.isLogFile;
+import static org.apache.cassandra.sidecar.utils.CDCUtil.isValid;
+import static org.apache.cassandra.sidecar.utils.CDCUtil.parseIndexFile;
+
+/**
+ * Provides REST endpoint for streaming cdc commit logs.
+ */
+@Singleton
+public class StreamCDCSegment extends AbstractHandler<String>
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamCDCSegment.class);
+
+    private final FileStreamer fileStreamer;
+    private final CDCLogCache cdcLogCache;
+
+    @Inject
+    public StreamCDCSegment(final InstanceMetadataFetcher metadataFetcher,
+                            final FileStreamer fileStreamer,
+                            final CDCLogCache cdcLogCache,
+                            final ExecutorPools executorPools,
+                            final CassandraInputValidator validator)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.fileStreamer = fileStreamer;
+        this.cdcLogCache = cdcLogCache;
+    }
+
+    @Override
+    protected void handleInternal(RoutingContext context,
+                                  HttpServerRequest httpRequest,
+                                  String host,
+                                  SocketAddress remoteAddress,
+                                  String segment)
+    {
+        final InstanceMetadata instance = metadataFetcher.instance(host);
+        final String cdcDir = instance.cdcDir();
+        final String idxFileName = getIdxFileName(segment);
+        final File segmentFile = new File(cdcDir, segment);
+        final File idxFile = new File(cdcDir, idxFileName);
+        Promise<Void> promise = Promise.promise();
+        streamSegment(context, segmentFile, idxFile, promise);
+        // Touch the files at the end of the request
+        // If the file exists in cache, its expiry is extended; otherwise, the 
cache is not changed.
+        promise.future().onSuccess(res -> cdcLogCache.touch(segmentFile, 
idxFile));
+    }
+
+    /**
+     * Validate file existence and hardlink the cdc commit log file and the 
index file.
+     */
+    private void streamSegment(RoutingContext ctx,

Review Comment:
   Refactoring suggestion
   
   1. Break the method into 2. One for validation, and the other for streaming 
(line#177 to line#180)
   2. Return `Future<Void>` from the streaming method.
   3. Remove the parameter `Promise<Void> promise`.



##########
server/build.gradle:
##########
@@ -189,6 +189,7 @@ publishing {
 test {
     systemProperty "vertxweb.environment", "dev"
     systemProperty "vertx.logger-delegate-factory-class-name", 
"io.vertx.core.logging.SLF4JLogDelegateFactory"
+    systemProperty "SegmentHardlinkCacheExpiryInSecs", "1"

Review Comment:
   it should be yaml config



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to