>From Ritik Raj <[email protected]>:

Ritik Raj has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20565?usp=email )

Change subject: [NO ISSUE][CLOUD] Fix premature buffer release caused by 
flatMap cancel
......................................................................

[NO ISSUE][CLOUD] Fix premature buffer release caused by flatMap cancel

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Azure SDK uses Reactor/Netty for async downloads, where each download
is a Mono merged with flatMap. When one Mono fails, flatMap cancels
others, causing Azure to release buffers while Netty may still write
into them, leading to IllegalReferenceCountException. Switching to
flatMapDelayError defers error propagation and prevents premature
cancellation, allowing all downloads to complete safely.

Ext-ref: MB-69283
Change-Id: If0c3577225a0dffffadfab13321a9c8702c3551e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20565
Tested-by: Ritik Raj <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
A 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/ReactiveExponentialRetryPolicy.java
M 
hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
3 files changed, 112 insertions(+), 3 deletions(-)

Approvals:
  Jenkins: Verified
  Murtadha Hubail: Looks good to me, approved
  Ritik Raj: Verified




diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
index 12d8a19..57f044d 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
@@ -35,6 +35,7 @@
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.util.ExponentialRetryPolicy;

 import com.azure.storage.blob.BlobAsyncClient;
 import com.azure.storage.blob.BlobContainerAsyncClient;
@@ -43,12 +44,14 @@

 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;

 public class AzureParallelDownloader extends AbstractParallelDownloader {
     private final IOManager ioManager;
     private final BlobContainerAsyncClient blobContainerAsyncClient;
     private final IRequestProfilerLimiter profiler;
     private final AzBlobStorageClientConfig config;
+    private final Retry retryPolicy;

     public AzureParallelDownloader(IOManager ioManager, 
BlobContainerAsyncClient blobContainerAsyncClient,
             IRequestProfilerLimiter profiler, AzBlobStorageClientConfig 
config) {
@@ -56,6 +59,7 @@
         this.blobContainerAsyncClient = blobContainerAsyncClient;
         this.profiler = profiler;
         this.config = config;
+        this.retryPolicy = ReactiveExponentialRetryPolicy.retryPolicy(new 
ExponentialRetryPolicy());
     }

     @Override
@@ -96,8 +100,9 @@
     }

     private void waitForFileDownloads(List<Mono<Void>> downloads) throws 
HyracksDataException {
-        runBlockingWithExceptionHandling(
-                () -> Flux.fromIterable(downloads).flatMap(mono -> mono, 
downloads.size()).then().block());
+        runBlockingWithExceptionHandling(() -> Flux.fromIterable(downloads)
+                .flatMapDelayError(mono -> mono.retryWhen(retryPolicy), 
downloads.size(), downloads.size()).then()
+                .block());
     }

     @Override
@@ -111,8 +116,9 @@
             directoryDownloads.add(directoryTask);
         }

+        int concurrency = config.getRequestsMaxPendingHttpConnections();
         runBlockingWithExceptionHandling(() -> 
Flux.fromIterable(directoryDownloads)
-                .flatMap(mono -> mono, 
config.getRequestsMaxPendingHttpConnections()).then().block());
+                .flatMapDelayError(mono -> mono.retryWhen(retryPolicy), 
concurrency, concurrency).then().block());

         return failedFiles;
     }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/ReactiveExponentialRetryPolicy.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/ReactiveExponentialRetryPolicy.java
new file mode 100644
index 0000000..ba2e3d4
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/ReactiveExponentialRetryPolicy.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import java.time.Duration;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
+import org.apache.hyracks.util.ExponentialRetryPolicy;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import reactor.util.retry.Retry;
+
+/**
+ * Utility methods for building Reactor {@link Retry} policies that behave 
similarly to the
+ * blocking {@link ExponentialRetryPolicy}.
+ */
+public class ReactiveExponentialRetryPolicy {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private ReactiveExponentialRetryPolicy() {
+    }
+
+    /**
+     * Creates a {@link Retry} instance based on an {@link 
ExponentialRetryPolicy}.
+     *
+     * @param policy the blocking policy to mirror. If {@code null}, defaults 
are used.
+     * @return a Reactor {@link Retry} behaving similarly to the provided 
policy.
+     */
+    public static Retry retryPolicy(ExponentialRetryPolicy policy) {
+        ExponentialRetryPolicy effectivePolicy = policy != null ? policy
+                : new 
ExponentialRetryPolicy(CloudRetryableRequestUtil.NUMBER_OF_RETRIES,
+                        CloudRetryableRequestUtil.MAX_DELAY_BETWEEN_RETRIES);
+        long initialDelay = Math.max(0L, effectivePolicy.getInitialDelay());
+        long maxDelay = Math.max(0L, effectivePolicy.getMaxDelay());
+        int maxRetries = Math.max(0, effectivePolicy.getMaxRetries());
+        long maxAttempts = Math.max(1L, (long) maxRetries + 1L);
+        return Retry.backoff(maxAttempts, 
Duration.ofMillis(initialDelay)).maxBackoff(Duration.ofMillis(maxDelay))
+                
.filter(ReactiveExponentialRetryPolicy::isRetryable).doBeforeRetry(signal -> {
+                    long retriesSoFar = signal.totalRetries();
+                    long delayMillis = computeDelayMillis(initialDelay, 
maxDelay, retriesSoFar);
+                    long attempt = retriesSoFar + 1;
+                    LOGGER.info("Retrying after {}ms, attempt {}/{}", 
delayMillis, attempt, maxRetries);
+                }).transientErrors(true);
+    }
+
+    private static long computeDelayMillis(long initialDelay, long maxDelay, 
long retriesSoFar) {
+        if (initialDelay <= 0 || maxDelay <= 0) {
+            return 0L;
+        }
+
+        long delay = initialDelay;
+        for (long i = 0; i < retriesSoFar; i++) {
+            delay = delay > maxDelay / 2 ? maxDelay : delay * 2;
+        }
+
+        long jitteredDelay = ThreadLocalRandom.current().nextLong(1, delay + 
1);
+
+        return Math.min(jitteredDelay, maxDelay);
+    }
+
+    private static boolean isRetryable(Throwable error) {
+        if (error instanceof IllegalArgumentException) {
+            return false;
+        }
+        if (ExceptionUtils.causedByInterrupt(error)) {
+            Thread.currentThread().interrupt();
+            return false;
+        }
+        return true;
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
index 8967e3e..385525c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
@@ -101,6 +101,18 @@
         return false;
     }

+    public int getMaxRetries() {
+        return maxRetries;
+    }
+
+    public long getInitialDelay() {
+        return delay;
+    }
+
+    public long getMaxDelay() {
+        return maxDelay;
+    }
+
     private static boolean isUnstable() {
         return Boolean.getBoolean(CLOUD_UNSTABLE_MODE);
     }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20565?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: asterixdb
Gerrit-Branch: phoenix
Gerrit-Change-Id: If0c3577225a0dffffadfab13321a9c8702c3551e
Gerrit-Change-Number: 20565
Gerrit-PatchSet: 4
Gerrit-Owner: Ritik Raj <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>
Gerrit-Reviewer: Ritik Raj <[email protected]>

Reply via email to