bbotella commented on code in PR #147:
URL: https://github.com/apache/cassandra-sidecar/pull/147#discussion_r1849315995


##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CDCLogCache.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.utils.CDCUtil;
+
+/**
+ * CDCLogCache caches the recently downloaded files to avoid being deleted by 
accident.
+ * <br>
+ * Downloads tracking is via {@linkplain #touch}.
+ * <br>
+ * In the event of deleting the _consumed_ files, 1 supersedes 2, meaning the 
_consumed_ files and their links
+ * are deleted, even though within the cache duration.
+ */
+@Singleton
+public class CDCLogCache
+{
+    public static final String TEMP_DIR_SUFFIX = "_tmp";
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CDCLogCache.class);
+    private static final RemovalListener<File, File> hardlinkRemover = 
notification -> deleteFileIfExist(notification.getValue());
+
+    // Cache for the hardlinks. Key: origin file; Value: link file
+    // The entries expire after 5 minutes
+    private final Cache<File, File> hardlinkCache;
+
+    @Inject
+    public CDCLogCache(ExecutorPools executorPools,
+                       InstancesConfig instancesConfig,
+                       SidecarConfiguration sidecarConfig)
+    {
+        long cacheExpiryInSecs = 
sidecarConfig.cdConfiguration().segmentHardlinkCacheExpiryInSecs();
+        hardlinkCache = CacheBuilder.newBuilder()
+                .expireAfterAccess(cacheExpiryInSecs, TimeUnit.SECONDS)
+                .removalListener(hardlinkRemover)
+                .build();
+        // Run cleanup in the internal pool, this prevents
+        // issues when initializing the cache
+        executorPools.internal().runBlocking(() -> 
cleanupLinkedFilesOnStartup(instancesConfig));
+
+        // setup periodic and serial cleanup
+        executorPools.internal()
+                     .setPeriodic(TimeUnit.SECONDS.toMillis(cacheExpiryInSecs),
+                                  id -> hardlinkCache.cleanUp(),
+                                  true);
+    }
+
+    public void touch(File segmentFile, File indexFile)
+    {
+        // renew the hardlinks
+        hardlinkCache.getIfPresent(segmentFile);
+        hardlinkCache.getIfPresent(indexFile);
+    }
+
+    /**
+     * Create a hardlink from origin in the cache if the hardlink does not 
exist yet.
+     *
+     * @param origin the source file
+     * @return the link file
+     * @throws IOException when an IO exception occurs during link
+     */
+    public File createLinkedFileInCache(File origin) throws IOException
+    {
+        File link = hardlinkCache.getIfPresent(origin);
+        if (link == null)
+        {
+            link = new File(getTempCdcDir(origin.getParent()), 
origin.getName());
+            try
+            {
+                // create link and cache it
+                Files.createLink(link.toPath(), origin.toPath());
+                hardlinkCache.put(origin, link);
+            }
+            catch (FileAlreadyExistsException e)
+            {
+                LOGGER.debug("The target of hardlink {} already exists. It 
could be created by a concurrent request.", link);
+            }
+        }
+        return link;
+    }
+
+    /**
+     * Clean up the linked file when the application is starting.
+     * There could be files left over if the application crashes during 
streaming the CDC segments.
+     * On a new start, the tmp directory for the linked CDC segments should be 
empty.
+     * It is only called in the constructor of the handler singleton.
+     *
+     * @param config instances config
+     */
+    @VisibleForTesting
+    public void cleanupLinkedFilesOnStartup(InstancesConfig config)
+    {
+        for (InstanceMetadata instance : config.instances())
+        {
+            try
+            {
+                cleanupLinkedFiles(instance);
+            }
+            catch (Exception e)
+            {
+                LOGGER.warn("Failed to clean up linked files for instance {}", 
instance.id(), e);
+            }
+        }
+    }
+
+    private void cleanupLinkedFiles(InstanceMetadata instance) throws 
IOException
+    {
+        File[] files = getTempCdcDir((instance.cdcDir()))
+                       .listFiles(f -> CDCUtil.isLogFile(f.getName()) || 
CDCUtil.isIndexFile(f.getName()));
+        if (files == null)
+            return;
+        for (File f : files)
+        {
+            deleteFileIfExist(f);
+        }
+    }
+
+    private static void deleteFileIfExist(File file)
+    {
+        if (file.exists() && file.isFile())
+        {
+            LOGGER.debug("Remove the link file={}", file);

Review Comment:
   Do we need this debug line?



##########
server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/ListCDCDirHandler.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.Json;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.common.response.ListCDCSegmentResponse;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.routes.AbstractHandler;
+import org.apache.cassandra.sidecar.utils.CDCUtil;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static org.apache.cassandra.sidecar.utils.CDCUtil.getIdxFileName;
+import static org.apache.cassandra.sidecar.utils.CDCUtil.getLogFilePrefix;
+import static org.apache.cassandra.sidecar.utils.CDCUtil.isIndexFile;
+import static org.apache.cassandra.sidecar.utils.CDCUtil.parseIndexFile;
+
+/**
+ * Provides REST endpoint for listing commit logs in CDC directory.
+ */
+@Singleton
+public class ListCDCDirHandler extends AbstractHandler<Void>
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ListCDCDirHandler.class);
+    private final ServiceConfiguration config;
+
+    @Inject
+    public ListCDCDirHandler(InstanceMetadataFetcher metadataFetcher,
+                             SidecarConfiguration config,
+                             ExecutorPools executorPools,
+                             CassandraInputValidator validator)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.config = config.serviceConfiguration();
+    }
+
+    @Override
+    protected void handleInternal(RoutingContext context,
+                                  HttpServerRequest httpRequest,
+                                  String host,
+                                  SocketAddress remoteAddress,
+                                  Void request)
+    {
+        String cdcDir = metadataFetcher.instance(host).cdcDir();
+        ListCDCSegmentResponse listCDCSegmentResponse = new 
ListCDCSegmentResponse(config.host(),
+                                                                               
    config.port(),
+                                                                               
    new LinkedList<>());
+        try
+        {
+            addResources(cdcDir, listCDCSegmentResponse);

Review Comment:
   Curious here. Aren't these API calls supposed to be synchronous? How would 
it work asynchronously?



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CDCLogCache.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.utils.CDCUtil;
+
+/**
+ * CDCLogCache caches the recently downloaded files to avoid being deleted by 
accident.
+ * <br>
+ * Downloads tracking is via {@linkplain #touch}.
+ * <br>
+ * In the event of deleting the _consumed_ files, 1 supersedes 2, meaning the 
_consumed_ files and their links
+ * are deleted, even though within the cache duration.
+ */
+@Singleton
+public class CDCLogCache
+{
+    public static final String TEMP_DIR_SUFFIX = "_tmp";
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CDCLogCache.class);
+    private static final RemovalListener<File, File> hardlinkRemover = 
notification -> deleteFileIfExist(notification.getValue());
+
+    // Cache for the hardlinks. Key: origin file; Value: link file
+    // The entries expire after 5 minutes
+    private final Cache<File, File> hardlinkCache;
+
+    @Inject
+    public CDCLogCache(ExecutorPools executorPools,
+                       InstancesConfig instancesConfig,
+                       SidecarConfiguration sidecarConfig)
+    {
+        long cacheExpiryInSecs = 
sidecarConfig.cdConfiguration().segmentHardlinkCacheExpiryInSecs();
+        hardlinkCache = CacheBuilder.newBuilder()
+                .expireAfterAccess(cacheExpiryInSecs, TimeUnit.SECONDS)
+                .removalListener(hardlinkRemover)
+                .build();
+        // Run cleanup in the internal pool, this prevents
+        // issues when initializing the cache
+        executorPools.internal().runBlocking(() -> 
cleanupLinkedFilesOnStartup(instancesConfig));
+
+        // setup periodic and serial cleanup
+        executorPools.internal()
+                     .setPeriodic(TimeUnit.SECONDS.toMillis(cacheExpiryInSecs),
+                                  id -> hardlinkCache.cleanUp(),
+                                  true);
+    }
+
+    public void touch(File segmentFile, File indexFile)
+    {
+        // renew the hardlinks
+        hardlinkCache.getIfPresent(segmentFile);
+        hardlinkCache.getIfPresent(indexFile);
+    }
+
+    /**
+     * Create a hardlink from origin in the cache if the hardlink does not 
exist yet.
+     *
+     * @param origin the source file
+     * @return the link file
+     * @throws IOException when an IO exception occurs during link
+     */
+    public File createLinkedFileInCache(File origin) throws IOException
+    {
+        File link = hardlinkCache.getIfPresent(origin);
+        if (link == null)
+        {
+            link = new File(getTempCdcDir(origin.getParent()), 
origin.getName());
+            try
+            {
+                // create link and cache it
+                Files.createLink(link.toPath(), origin.toPath());
+                hardlinkCache.put(origin, link);
+            }
+            catch (FileAlreadyExistsException e)
+            {
+                LOGGER.debug("The target of hardlink {} already exists. It 
could be created by a concurrent request.", link);
+            }
+        }
+        return link;
+    }
+
+    /**
+     * Clean up the linked file when the application is starting.
+     * There could be files left over if the application crashes during 
streaming the CDC segments.
+     * On a new start, the tmp directory for the linked CDC segments should be 
empty.
+     * It is only called in the constructor of the handler singleton.
+     *
+     * @param config instances config
+     */
+    @VisibleForTesting
+    public void cleanupLinkedFilesOnStartup(InstancesConfig config)
+    {
+        for (InstanceMetadata instance : config.instances())
+        {
+            try
+            {
+                cleanupLinkedFiles(instance);
+            }
+            catch (Exception e)
+            {
+                LOGGER.warn("Failed to clean up linked files for instance {}", 
instance.id(), e);
+            }
+        }
+    }
+
+    private void cleanupLinkedFiles(InstanceMetadata instance) throws 
IOException
+    {
+        File[] files = getTempCdcDir((instance.cdcDir()))
+                       .listFiles(f -> CDCUtil.isLogFile(f.getName()) || 
CDCUtil.isIndexFile(f.getName()));
+        if (files == null)
+            return;
+        for (File f : files)
+        {
+            deleteFileIfExist(f);
+        }
+    }
+
+    private static void deleteFileIfExist(File file)
+    {
+        if (file.exists() && file.isFile())
+        {
+            LOGGER.debug("Remove the link file={}", file);
+            if (file.delete())
+            {
+                LOGGER.debug("Removed the link file={}", file);
+            }
+        }
+    }
+
+    private static File getTempCdcDir(String cdcDir) throws IOException
+    {
+        File dir = new File(cdcDir + TEMP_DIR_SUFFIX);
+        try
+        {
+            Files.createDirectories(dir.toPath());
+        }
+        catch (IOException e)
+        {
+            LOGGER.warn("Unable to create temporary CDC directory {}", dir, e);

Review Comment:
   `LOGGER.error`?



##########
server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/ListCDCDirHandler.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.Json;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.common.response.ListCDCSegmentResponse;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.routes.AbstractHandler;
+import org.apache.cassandra.sidecar.utils.CDCUtil;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static org.apache.cassandra.sidecar.utils.CDCUtil.getIdxFileName;
+import static org.apache.cassandra.sidecar.utils.CDCUtil.getLogFilePrefix;
+import static org.apache.cassandra.sidecar.utils.CDCUtil.isIndexFile;
+import static org.apache.cassandra.sidecar.utils.CDCUtil.parseIndexFile;
+
+/**
+ * Provides REST endpoint for listing commit logs in CDC directory.
+ */
+@Singleton
+public class ListCDCDirHandler extends AbstractHandler<Void>
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ListCDCDirHandler.class);
+    private final ServiceConfiguration config;
+
+    @Inject
+    public ListCDCDirHandler(InstanceMetadataFetcher metadataFetcher,
+                             SidecarConfiguration config,
+                             ExecutorPools executorPools,
+                             CassandraInputValidator validator)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.config = config.serviceConfiguration();
+    }
+
+    @Override
+    protected void handleInternal(RoutingContext context,
+                                  HttpServerRequest httpRequest,
+                                  String host,
+                                  SocketAddress remoteAddress,
+                                  Void request)
+    {
+        String cdcDir = metadataFetcher.instance(host).cdcDir();
+        ListCDCSegmentResponse listCDCSegmentResponse = new 
ListCDCSegmentResponse(config.host(),
+                                                                               
    config.port(),
+                                                                               
    new ArrayList<>());
+        try
+        {
+            addResources(cdcDir, listCDCSegmentResponse);
+        }
+        catch (IOException e)
+        {
+            LOGGER.warn("Error listing the CDC commit log segments", e);
+            context.response()
+                   
.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code())
+                   .setStatusMessage(e.getMessage())
+                   .end();
+            return;
+        }
+
+        context.response().end(Json.encodePrettily(listCDCSegmentResponse));
+    }
+
+    private void addResources(String cdcDirPath, ListCDCSegmentResponse 
listResponse) throws IOException
+    {
+        File cdcDir = Paths.get(cdcDirPath).toAbsolutePath().toFile();
+        if (!cdcDir.isDirectory())
+        {
+            throw new IOException("CDC directory does not exist");
+        }
+
+        File[] cdcFiles = cdcDir.listFiles();
+        if (cdcFiles == null || cdcFiles.length == 0)
+        {
+            return;
+        }
+
+        Set<String> idxFileNamePrefixes = Arrays.stream(cdcFiles)

Review Comment:
   Maybe a nitpick, but in the main Cassandra project we decided to avoid 
streams on hot paths. 
   https://lists.apache.org/thread/65glsjzkmpktzmns6j9wvr4nczvskx36
   
   Maybe the performance issue is not as critical in this component, but it'd 
be nice to keep it aligned.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to