>From Ritik Raj <[email protected]>:
Ritik Raj has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20556?usp=email )
Change subject: [ASTERIXDB-3669][CLOUD] Retry downloadDirectories in parallel
downloader
......................................................................
[ASTERIXDB-3669][CLOUD] Retry downloadDirectories in parallel downloader
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Added retry with ExponentialRetryPolicy() for
downloadDirectories in ParallelDownloader.
Ext-ref: MB-69226
Change-Id: Ica47227218a323ededced75691d3f64070c97729
---
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
A
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/AbstractParallelDownloader.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
M
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
10 files changed, 108 insertions(+), 37 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/56/20556/1
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 6fc271e..23df349 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -192,7 +192,7 @@
public void downloadLibrary(Collection<FileReference> libPath) throws
HyracksDataException {
try (IParallelDownloader downloader =
cloudClient.createParallelDownloader(bucket, localIoManager)) {
LOGGER.info("Downloading all files located in {}", libPath);
- downloader.downloadDirectories(libPath);
+ downloader.downloadDirectoriesWithRetry(libPath);
LOGGER.info("Finished downloading {}", libPath);
}
}
@@ -202,7 +202,7 @@
FileReference appDir = resolveAbsolutePath(
localIoManager.getWorkspacePath(0).getPath() +
File.separator + APPLICATION_ROOT_DIR_NAME);
LOGGER.info("Downloading all libraries in + {}", appDir);
- downloader.downloadDirectories(Collections.singletonList(appDir));
+
downloader.downloadDirectoriesWithRetry(Collections.singletonList(appDir));
LOGGER.info("Finished downloading all libraries");
}
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index 1cb6077..59e21ac 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -62,7 +62,7 @@
protected void downloadPartitions(boolean metadataNode, int
metadataPartition) throws HyracksDataException {
IParallelDownloader downloader =
cloudClient.createParallelDownloader(bucket, localIoManager);
LOGGER.info("Downloading all files located in {}", partitionPaths);
- downloader.downloadDirectories(partitionPaths);
+ downloader.downloadDirectoriesWithRetry(partitionPaths);
LOGGER.info("Finished downloading {}", partitionPaths);
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 1c5efd9..999a6b6 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -156,7 +156,7 @@
LOGGER.info("Downloading metadata partition {}, Current uncached
files: {}", metadataPartition,
uncachedFiles);
FileReference metadataDir = resolve(STORAGE_ROOT_DIR_NAME +
File.separator + partitionDir);
- downloader.downloadDirectories(Collections.singleton(metadataDir));
+
downloader.downloadDirectoriesWithRetry(Collections.singleton(metadataDir));
uncachedFiles.removeIf(f ->
f.getRelativePath().contains(partitionDir));
LOGGER.info("Finished downloading metadata partition. Current
uncached files: {}", uncachedFiles);
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/AbstractParallelDownloader.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/AbstractParallelDownloader.java
new file mode 100644
index 0000000..29a50a6
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/AbstractParallelDownloader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
+import org.apache.hyracks.util.ExponentialRetryPolicy;
+import org.apache.hyracks.util.IRetryPolicy;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public abstract class AbstractParallelDownloader implements
IParallelDownloader {
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ public void downloadDirectoriesWithRetry(Collection<FileReference>
toDownload) throws HyracksDataException {
+ Set<FileReference> failedFiles = new HashSet<>(toDownload);
+ IRetryPolicy retryPolicy = new
ExponentialRetryPolicy(CloudRetryableRequestUtil.NUMBER_OF_RETRIES,
+ CloudRetryableRequestUtil.MAX_DELAY_BETWEEN_RETRIES);
+ int attempt = 1;
+ while (true) {
+ try {
+ failedFiles = downloadDirectories(toDownload);
+
+ if (failedFiles.isEmpty()) {
+ return;
+ }
+
+ if (!retryPolicy.retry(null)) {
+ LOGGER.error("Exhausted retries ({}) — failed to download
{} directories: {}",
+ CloudRetryableRequestUtil.NUMBER_OF_RETRIES,
failedFiles.size(), failedFiles);
+ throw
HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION);
+ }
+
+ LOGGER.warn("Failed to download directories (attempt {}/{}),
retrying. Remaining: {}", attempt,
+ CloudRetryableRequestUtil.NUMBER_OF_RETRIES,
failedFiles.size());
+ } catch (IOException | ExecutionException | InterruptedException
e) {
+ if (ExceptionUtils.causedByInterrupt(e) &&
!Thread.currentThread().isInterrupted()) {
+ LOGGER.warn("Lost suppressed interrupt during
downloadDirectory retry", e);
+ throw HyracksDataException.create(e);
+ }
+ try {
+ if (!retryPolicy.retry(e)) {
+ LOGGER.error("Exhausted retries ({}) — failed to
download {} directories: {}",
+ CloudRetryableRequestUtil.NUMBER_OF_RETRIES,
failedFiles.size(), failedFiles);
+ throw HyracksDataException.create(e);
+ }
+ } catch (InterruptedException e1) {
+ throw HyracksDataException.create(e1);
+ }
+ LOGGER.warn("Failed to downloadDirectories, performing {}/{}",
attempt,
+ CloudRetryableRequestUtil.NUMBER_OF_RETRIES, e);
+ }
+ attempt++;
+ }
+ }
+
+ protected abstract Set<FileReference>
downloadDirectories(Collection<FileReference> toDownload)
+ throws ExecutionException, InterruptedException, IOException;
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
index 6f1c453..5f7ff0b 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
@@ -39,7 +39,7 @@
* @param toDownload all files to be downloaded
* @return file that failed to download
*/
- Collection<FileReference> downloadDirectories(Collection<FileReference>
toDownload) throws HyracksDataException;
+ void downloadDirectoriesWithRetry(Collection<FileReference> toDownload)
throws HyracksDataException;
/**
* Close the downloader and release all of its resources
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
index 0321a35..bfb52c9 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
@@ -30,7 +30,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.AbstractParallelDownloader;
import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -57,7 +57,7 @@
import software.amazon.awssdk.utils.AttributeMap;
@ThreadSafe
-class S3ParallelDownloader implements IParallelDownloader {
+class S3ParallelDownloader extends AbstractParallelDownloader {
private final String bucket;
private final IOManager ioManager;
private final S3AsyncClient s3AsyncClient;
@@ -84,17 +84,10 @@
}
@Override
- public Collection<FileReference>
downloadDirectories(Collection<FileReference> toDownload)
- throws HyracksDataException {
- Set<FileReference> failedFiles;
+ public Set<FileReference> downloadDirectories(Collection<FileReference>
toDownload)
+ throws HyracksDataException, ExecutionException,
InterruptedException {
List<CompletableFuture<CompletedDirectoryDownload>> downloads =
startDownloadingDirectories(toDownload);
- try {
- failedFiles = waitForDirectoryDownloads(downloads);
- } catch (ExecutionException | InterruptedException e) {
- throw HyracksDataException.create(e);
- }
-
- return failedFiles;
+ return waitForDirectoryDownloads(downloads);
}
@Override
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
index 53e3dad..498c34b 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
@@ -33,7 +33,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.AbstractParallelDownloader;
import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -50,7 +50,7 @@
import software.amazon.awssdk.services.s3.model.S3Object;
@ThreadSafe
-public class S3SyncDownloader implements IParallelDownloader {
+public class S3SyncDownloader extends AbstractParallelDownloader {
private static final Logger LOGGER = LogManager.getLogger();
private final String bucket;
@@ -126,14 +126,10 @@
}
@Override
- public Collection<FileReference>
downloadDirectories(Collection<FileReference> toDownload)
- throws HyracksDataException {
+ public Set<FileReference> downloadDirectories(Collection<FileReference>
toDownload)
+ throws IOException, ExecutionException, InterruptedException {
Set<FileReference> failedFiles;
- try {
- failedFiles = downloadDirectoriesAndWait(toDownload);
- } catch (IOException | InterruptedException | ExecutionException e) {
- throw HyracksDataException.create(e);
- }
+ failedFiles = downloadDirectoriesAndWait(toDownload);
return failedFiles;
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
index 364fb2a..12d8a19 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
@@ -29,7 +29,7 @@
import java.util.List;
import java.util.Set;
-import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.AbstractParallelDownloader;
import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
@@ -44,7 +44,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-public class AzureParallelDownloader implements IParallelDownloader {
+public class AzureParallelDownloader extends AbstractParallelDownloader {
private final IOManager ioManager;
private final BlobContainerAsyncClient blobContainerAsyncClient;
private final IRequestProfilerLimiter profiler;
@@ -101,8 +101,7 @@
}
@Override
- public Collection<FileReference>
downloadDirectories(Collection<FileReference> directories)
- throws HyracksDataException {
+ public Set<FileReference> downloadDirectories(Collection<FileReference>
directories) throws HyracksDataException {
Set<FileReference> failedFiles = new HashSet<>();
List<Mono<Void>> directoryDownloads = new ArrayList<>();
@@ -196,4 +195,4 @@
throw HyracksDataException.create(e);
}
}
-}
\ No newline at end of file
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
index b9e7eee..574a9b6 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
@@ -30,7 +30,7 @@
import java.util.Map;
import java.util.Set;
-import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.AbstractParallelDownloader;
import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -51,7 +51,7 @@
import com.google.cloud.storage.transfermanager.TransferManagerConfig;
import com.google.cloud.storage.transfermanager.TransferStatus;
-public class GCSParallelDownloader implements IParallelDownloader {
+public class GCSParallelDownloader extends AbstractParallelDownloader {
private final String bucket;
private final IOManager ioManager;
@@ -102,8 +102,7 @@
}
@Override
- public Collection<FileReference>
downloadDirectories(Collection<FileReference> toDownload)
- throws HyracksDataException {
+ public Set<FileReference> downloadDirectories(Collection<FileReference>
toDownload) {
Set<FileReference> failedFiles = new HashSet<>();
ParallelDownloadConfig.Builder config =
ParallelDownloadConfig.newBuilder().setBucketName(bucket).setStripPrefix(this.config.getPrefix());
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
index e011900..5461ed5 100644
---
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
@@ -53,8 +53,8 @@
private static final int UNSTABLE_NUMBER_OF_RETRIES = 100;
private static final int UNSTABLE_MAX_DELAY_BETWEEN_RETRIES_IN_MILLIS = 0;
private static final Logger LOGGER = LogManager.getLogger();
- private static final int NUMBER_OF_RETRIES = getNumberOfRetries();
- private static final long MAX_DELAY_BETWEEN_RETRIES =
getMaxDelayBetweenRetries();
+ public static final int NUMBER_OF_RETRIES = getNumberOfRetries();
+ public static final long MAX_DELAY_BETWEEN_RETRIES =
getMaxDelayBetweenRetries();
private static final ICloudRetryPredicate RETRY_ALWAYS_PREDICATE = e ->
true;
private static final ICloudBeforeRetryRequest NO_OP_BEFORE_RETRY = () -> {
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20556?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: Ica47227218a323ededced75691d3f64070c97729
Gerrit-Change-Number: 20556
Gerrit-PatchSet: 1
Gerrit-Owner: Ritik Raj <[email protected]>