>From Ritik Raj <[email protected]>:

Ritik Raj has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20556?usp=email )


Change subject: [ASTERIXDB-3669][CLOUD] Retry downloadDirectories in parallel 
downloader
......................................................................

[ASTERIXDB-3669][CLOUD] Retry downloadDirectories in parallel downloader

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Added retry with ExponentialRetryPolicy() for
downloadDirectories in ParallelDownloader.

Ext-ref: MB-69226
Change-Id: Ica47227218a323ededced75691d3f64070c97729
---
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
A 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/AbstractParallelDownloader.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
M 
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
10 files changed, 108 insertions(+), 37 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/56/20556/1

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 6fc271e..23df349 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -192,7 +192,7 @@
     public void downloadLibrary(Collection<FileReference> libPath) throws 
HyracksDataException {
         try (IParallelDownloader downloader = 
cloudClient.createParallelDownloader(bucket, localIoManager)) {
             LOGGER.info("Downloading all files located in {}", libPath);
-            downloader.downloadDirectories(libPath);
+            downloader.downloadDirectoriesWithRetry(libPath);
             LOGGER.info("Finished downloading {}", libPath);
         }
     }
@@ -202,7 +202,7 @@
             FileReference appDir = resolveAbsolutePath(
                     localIoManager.getWorkspacePath(0).getPath() + 
File.separator + APPLICATION_ROOT_DIR_NAME);
             LOGGER.info("Downloading all libraries in + {}", appDir);
-            downloader.downloadDirectories(Collections.singletonList(appDir));
+            
downloader.downloadDirectoriesWithRetry(Collections.singletonList(appDir));
             LOGGER.info("Finished downloading all libraries");
         }
     }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index 1cb6077..59e21ac 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -62,7 +62,7 @@
     protected void downloadPartitions(boolean metadataNode, int 
metadataPartition) throws HyracksDataException {
         IParallelDownloader downloader = 
cloudClient.createParallelDownloader(bucket, localIoManager);
         LOGGER.info("Downloading all files located in {}", partitionPaths);
-        downloader.downloadDirectories(partitionPaths);
+        downloader.downloadDirectoriesWithRetry(partitionPaths);
         LOGGER.info("Finished downloading {}", partitionPaths);
     }

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 1c5efd9..999a6b6 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -156,7 +156,7 @@
             LOGGER.info("Downloading metadata partition {}, Current uncached 
files: {}", metadataPartition,
                     uncachedFiles);
             FileReference metadataDir = resolve(STORAGE_ROOT_DIR_NAME + 
File.separator + partitionDir);
-            downloader.downloadDirectories(Collections.singleton(metadataDir));
+            
downloader.downloadDirectoriesWithRetry(Collections.singleton(metadataDir));
             uncachedFiles.removeIf(f -> 
f.getRelativePath().contains(partitionDir));
             LOGGER.info("Finished downloading metadata partition. Current 
uncached files: {}", uncachedFiles);
         }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/AbstractParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/AbstractParallelDownloader.java
new file mode 100644
index 0000000..29a50a6
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/AbstractParallelDownloader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
+import org.apache.hyracks.util.ExponentialRetryPolicy;
+import org.apache.hyracks.util.IRetryPolicy;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public abstract class AbstractParallelDownloader implements 
IParallelDownloader {
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    public void downloadDirectoriesWithRetry(Collection<FileReference> 
toDownload) throws HyracksDataException {
+        Set<FileReference> failedFiles = new HashSet<>(toDownload);
+        IRetryPolicy retryPolicy = new 
ExponentialRetryPolicy(CloudRetryableRequestUtil.NUMBER_OF_RETRIES,
+                CloudRetryableRequestUtil.MAX_DELAY_BETWEEN_RETRIES);
+        int attempt = 1;
+        while (true) {
+            try {
+                failedFiles = downloadDirectories(toDownload);
+
+                if (failedFiles.isEmpty()) {
+                    return;
+                }
+
+                if (!retryPolicy.retry(null)) {
+                    LOGGER.error("Exhausted retries ({}) — failed to download 
{} directories: {}",
+                            CloudRetryableRequestUtil.NUMBER_OF_RETRIES, 
failedFiles.size(), failedFiles);
+                    throw 
HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION);
+                }
+
+                LOGGER.warn("Failed to download directories (attempt {}/{}), 
retrying. Remaining: {}", attempt,
+                        CloudRetryableRequestUtil.NUMBER_OF_RETRIES, 
failedFiles.size());
+            } catch (IOException | ExecutionException | InterruptedException 
e) {
+                if (ExceptionUtils.causedByInterrupt(e) && 
!Thread.currentThread().isInterrupted()) {
+                    LOGGER.warn("Lost suppressed interrupt during 
downloadDirectory retry", e);
+                    throw HyracksDataException.create(e);
+                }
+                try {
+                    if (!retryPolicy.retry(e)) {
+                        LOGGER.error("Exhausted retries ({}) — failed to 
download {} directories: {}",
+                                CloudRetryableRequestUtil.NUMBER_OF_RETRIES, 
failedFiles.size(), failedFiles);
+                        throw HyracksDataException.create(e);
+                    }
+                } catch (InterruptedException e1) {
+                    throw HyracksDataException.create(e1);
+                }
+                LOGGER.warn("Failed to downloadDirectories, performing {}/{}", 
attempt,
+                        CloudRetryableRequestUtil.NUMBER_OF_RETRIES, e);
+            }
+            attempt++;
+        }
+    }
+
+    protected abstract Set<FileReference> 
downloadDirectories(Collection<FileReference> toDownload)
+            throws ExecutionException, InterruptedException, IOException;
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
index 6f1c453..5f7ff0b 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
@@ -39,7 +39,7 @@
      * @param toDownload all files to be downloaded
      * @return file that failed to download
      */
-    Collection<FileReference> downloadDirectories(Collection<FileReference> 
toDownload) throws HyracksDataException;
+    void downloadDirectoriesWithRetry(Collection<FileReference> toDownload) 
throws HyracksDataException;

     /**
      * Close the downloader and release all of its resources
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
index 0321a35..bfb52c9 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
@@ -30,7 +30,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;

-import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.AbstractParallelDownloader;
 import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -57,7 +57,7 @@
 import software.amazon.awssdk.utils.AttributeMap;

 @ThreadSafe
-class S3ParallelDownloader implements IParallelDownloader {
+class S3ParallelDownloader extends AbstractParallelDownloader {
     private final String bucket;
     private final IOManager ioManager;
     private final S3AsyncClient s3AsyncClient;
@@ -84,17 +84,10 @@
     }

     @Override
-    public Collection<FileReference> 
downloadDirectories(Collection<FileReference> toDownload)
-            throws HyracksDataException {
-        Set<FileReference> failedFiles;
+    public Set<FileReference> downloadDirectories(Collection<FileReference> 
toDownload)
+            throws HyracksDataException, ExecutionException, 
InterruptedException {
         List<CompletableFuture<CompletedDirectoryDownload>> downloads = 
startDownloadingDirectories(toDownload);
-        try {
-            failedFiles = waitForDirectoryDownloads(downloads);
-        } catch (ExecutionException | InterruptedException e) {
-            throw HyracksDataException.create(e);
-        }
-
-        return failedFiles;
+        return waitForDirectoryDownloads(downloads);
     }

     @Override
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
index 53e3dad..498c34b 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
@@ -33,7 +33,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;

-import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.AbstractParallelDownloader;
 import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -50,7 +50,7 @@
 import software.amazon.awssdk.services.s3.model.S3Object;

 @ThreadSafe
-public class S3SyncDownloader implements IParallelDownloader {
+public class S3SyncDownloader extends AbstractParallelDownloader {
     private static final Logger LOGGER = LogManager.getLogger();

     private final String bucket;
@@ -126,14 +126,10 @@
     }

     @Override
-    public Collection<FileReference> 
downloadDirectories(Collection<FileReference> toDownload)
-            throws HyracksDataException {
+    public Set<FileReference> downloadDirectories(Collection<FileReference> 
toDownload)
+            throws IOException, ExecutionException, InterruptedException {
         Set<FileReference> failedFiles;
-        try {
-            failedFiles = downloadDirectoriesAndWait(toDownload);
-        } catch (IOException | InterruptedException | ExecutionException e) {
-            throw HyracksDataException.create(e);
-        }
+        failedFiles = downloadDirectoriesAndWait(toDownload);
         return failedFiles;
     }

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
index 364fb2a..12d8a19 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
@@ -29,7 +29,7 @@
 import java.util.List;
 import java.util.Set;

-import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.AbstractParallelDownloader;
 import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
@@ -44,7 +44,7 @@
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;

-public class AzureParallelDownloader implements IParallelDownloader {
+public class AzureParallelDownloader extends AbstractParallelDownloader {
     private final IOManager ioManager;
     private final BlobContainerAsyncClient blobContainerAsyncClient;
     private final IRequestProfilerLimiter profiler;
@@ -101,8 +101,7 @@
     }

     @Override
-    public Collection<FileReference> 
downloadDirectories(Collection<FileReference> directories)
-            throws HyracksDataException {
+    public Set<FileReference> downloadDirectories(Collection<FileReference> 
directories) throws HyracksDataException {

         Set<FileReference> failedFiles = new HashSet<>();
         List<Mono<Void>> directoryDownloads = new ArrayList<>();
@@ -196,4 +195,4 @@
             throw HyracksDataException.create(e);
         }
     }
-}
\ No newline at end of file
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
index b9e7eee..574a9b6 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
@@ -30,7 +30,7 @@
 import java.util.Map;
 import java.util.Set;

-import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.AbstractParallelDownloader;
 import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -51,7 +51,7 @@
 import com.google.cloud.storage.transfermanager.TransferManagerConfig;
 import com.google.cloud.storage.transfermanager.TransferStatus;

-public class GCSParallelDownloader implements IParallelDownloader {
+public class GCSParallelDownloader extends AbstractParallelDownloader {

     private final String bucket;
     private final IOManager ioManager;
@@ -102,8 +102,7 @@
     }

     @Override
-    public Collection<FileReference> 
downloadDirectories(Collection<FileReference> toDownload)
-            throws HyracksDataException {
+    public Set<FileReference> downloadDirectories(Collection<FileReference> 
toDownload) {
         Set<FileReference> failedFiles = new HashSet<>();
         ParallelDownloadConfig.Builder config =
                 
ParallelDownloadConfig.newBuilder().setBucketName(bucket).setStripPrefix(this.config.getPrefix());
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
index e011900..5461ed5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
@@ -53,8 +53,8 @@
     private static final int UNSTABLE_NUMBER_OF_RETRIES = 100;
     private static final int UNSTABLE_MAX_DELAY_BETWEEN_RETRIES_IN_MILLIS = 0;
     private static final Logger LOGGER = LogManager.getLogger();
-    private static final int NUMBER_OF_RETRIES = getNumberOfRetries();
-    private static final long MAX_DELAY_BETWEEN_RETRIES = 
getMaxDelayBetweenRetries();
+    public static final int NUMBER_OF_RETRIES = getNumberOfRetries();
+    public static final long MAX_DELAY_BETWEEN_RETRIES = 
getMaxDelayBetweenRetries();

     private static final ICloudRetryPredicate RETRY_ALWAYS_PREDICATE = e -> 
true;
     private static final ICloudBeforeRetryRequest NO_OP_BEFORE_RETRY = () -> {

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20556?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: Ica47227218a323ededced75691d3f64070c97729
Gerrit-Change-Number: 20556
Gerrit-PatchSet: 1
Gerrit-Owner: Ritik Raj <[email protected]>

Reply via email to