>From Wail Alkowaileet <[email protected]>:

Wail Alkowaileet has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18378 )


Change subject: [NO ISSUE] Make page cloudRead retries on failure
......................................................................

[NO ISSUE] Make page cloudRead retries on failure

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Retry on page cloudRead fails

Change-Id: Ia8c34d4ba7a3527fea22149e5065815095c39ab2
---
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
A 
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/stream/CloudInputStream.java
A 
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryUtil.java
A 
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRetry.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
M 
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
A 
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudReturnableRequest.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
A 
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRequest.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
12 files changed, 457 insertions(+), 72 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/78/18378/1

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 4ca9f59..19d6f2c 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -51,6 +51,11 @@
 import org.apache.hyracks.api.io.IIOBulkOperation;
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRetry;
+import org.apache.hyracks.cloud.io.request.ICloudReturnableRequest;
+import org.apache.hyracks.cloud.io.stream.CloudInputStream;
+import org.apache.hyracks.cloud.util.CloudRetryUtil;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -172,12 +177,28 @@

     @Override
     public final void cloudRead(IFileHandle fHandle, long offset, ByteBuffer 
data) throws HyracksDataException {
-        cloudClient.read(bucket, fHandle.getFileReference().getRelativePath(), 
offset, data);
+        ICloudRequest request =
+                () -> cloudClient.read(bucket, 
fHandle.getFileReference().getRelativePath(), offset, data);
+        ICloudRetry retry = () -> data.position(0);
+        CloudRetryUtil.run(request, retry);
     }

     @Override
-    public final InputStream cloudRead(IFileHandle fHandle, long offset, long 
length) {
-        return cloudClient.getObjectStream(bucket, 
fHandle.getFileReference().getRelativePath(), offset, length);
+    public final CloudInputStream cloudRead(IFileHandle fHandle, long offset, 
long length) throws HyracksDataException {
+        ICloudReturnableRequest<CloudInputStream> request = () -> {
+            InputStream stream =
+                    cloudClient.getObjectStream(bucket, 
fHandle.getFileReference().getRelativePath(), offset, length);
+            return new CloudInputStream(this, fHandle, stream, offset, length);
+        };
+
+        return CloudRetryUtil.run(request);
+    }
+
+    @Override
+    public void restoreStream(CloudInputStream cloudStream) {
+        InputStream stream = cloudClient.getObjectStream(bucket, 
cloudStream.getPath(), cloudStream.getOffset(),
+                cloudStream.getRemaining());
+        cloudStream.setInputStream(stream);
     }

     @Override
@@ -279,7 +300,7 @@
             // completing the upload.
             ICloudWriter cloudWriter = ((CloudFileHandle) 
fileHandle).getCloudWriter();
             try {
-                cloudWriter.finish();
+                CloudRetryUtil.run(cloudWriter::finish);
             } catch (HyracksDataException e) {
                 savedEx = e;
             }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index 885d612..3819b6e 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -25,6 +25,9 @@
 import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
 import org.apache.asterix.cloud.clients.ICloudWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRetry;
+import org.apache.hyracks.cloud.util.CloudRetryUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;

@@ -177,7 +180,9 @@
     private void uploadAndWait() throws HyracksDataException {
         writeBuffer.flip();
         try {
-            bufferedWriter.upload(this, writeBuffer.limit());
+            ICloudRequest request = () -> bufferedWriter.upload(this, 
writeBuffer.limit());
+            ICloudRetry retry = () -> writeBuffer.position(0);
+            CloudRetryUtil.run(request, retry);
         } catch (Exception e) {
             LOGGER.error(e);
             throw HyracksDataException.create(e);
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 72f3446..be92a68 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -53,6 +53,7 @@
 import org.apache.hyracks.api.io.IIOBulkOperation;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.cloud.util.CloudRetryUtil;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -174,33 +175,33 @@

     @Override
     public Set<FileReference> list(FileReference dir, FilenameFilter filter) 
throws HyracksDataException {
-        return accessor.doList(dir, filter);
+        return CloudRetryUtil.run(() -> accessor.doList(dir, filter));
     }

     @Override
     public boolean exists(FileReference fileRef) throws HyracksDataException {
-        return accessor.doExists(fileRef);
+        return CloudRetryUtil.run(() -> accessor.doExists(fileRef));
     }

     @Override
     public long getSize(FileReference fileReference) throws 
HyracksDataException {
-        return accessor.doGetSize(fileReference);
+        return CloudRetryUtil.run(() -> accessor.doGetSize(fileReference));
     }

     @Override
     public byte[] readAllBytes(FileReference fileRef) throws 
HyracksDataException {
-        return accessor.doReadAllBytes(fileRef);
+        return CloudRetryUtil.run(() -> accessor.doReadAllBytes(fileRef));
     }

     @Override
     public void delete(FileReference fileRef) throws HyracksDataException {
-        accessor.doDelete(fileRef);
+        CloudRetryUtil.run(() -> CloudRetryUtil.run(() -> 
accessor.doDelete(fileRef)));
         log("DELETE", fileRef);
     }

     @Override
     public void overwrite(FileReference fileRef, byte[] bytes) throws 
HyracksDataException {
-        accessor.doOverwrite(fileRef, bytes);
+        CloudRetryUtil.run(() -> accessor.doOverwrite(fileRef, bytes));
         log("WRITE", fileRef);
     }

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
index 8d28d3a..48c3b0f 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
@@ -24,6 +24,7 @@
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.cloud.util.CloudRetryUtil;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.control.nc.io.bulk.DeleteBulkOperation;
 import org.apache.logging.log4j.LogManager;
@@ -57,7 +58,7 @@
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Bulk deleting: local: {}, cloud: {}", 
fileReferences, paths);
         }
-        cloudClient.deleteObjects(bucket, paths);
+        CloudRetryUtil.run(() -> cloudClient.deleteObjects(bucket, paths));
         // Bulk delete locally as well
         super.performOperation();
         callBack.call(fileReferences);
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
index 93be80c..a554951 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
@@ -21,12 +21,12 @@
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeUnit;

 import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.util.CloudRetryUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;

@@ -92,28 +92,7 @@
         CompletedMultipartUpload completedMultipartUpload = 
CompletedMultipartUpload.builder().parts(partQueue).build();
         CompleteMultipartUploadRequest completeMultipartUploadRequest = 
CompleteMultipartUploadRequest.builder()
                 
.bucket(bucket).key(path).uploadId(uploadId).multipartUpload(completedMultipartUpload).build();
-        int retries = 0;
-        while (true) {
-            try {
-                completeMultipartUpload(completeMultipartUploadRequest);
-                break;
-            } catch (Exception e) {
-                retries++;
-                if (retries == MAX_RETRIES) {
-                    throw HyracksDataException.create(e);
-                }
-                LOGGER.info(() -> "S3 storage write retry, encountered: " + 
e.getMessage());
-
-                // Backoff for 1 sec for the first 2 retries, and 2 seconds 
from there onward
-                try {
-                    Thread.sleep(TimeUnit.SECONDS.toMillis(retries < 2 ? 1 : 
2));
-                } catch (InterruptedException ex) {
-                    Thread.currentThread().interrupt();
-                    throw HyracksDataException.create(ex);
-                }
-            }
-        }
-
+        CloudRetryUtil.run(() -> 
completeMultipartUpload(completeMultipartUploadRequest));
         log("FINISHED");
     }

diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
index 0dca417..c5a5cd6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -18,11 +18,13 @@
  */
 package org.apache.hyracks.cloud.io;

-import java.io.InputStream;
 import java.nio.ByteBuffer;

 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRetry;
+import org.apache.hyracks.cloud.io.stream.CloudInputStream;

 /**
  * Certain operations needed to be provided by {@link 
org.apache.hyracks.api.io.IIOManager} to support cloud
@@ -45,7 +47,16 @@
      * @param offset  starting offset
      * @return input stream of the required data
      */
-    InputStream cloudRead(IFileHandle fHandle, long offset, long length);
+    CloudInputStream cloudRead(IFileHandle fHandle, long offset, long length) 
throws HyracksDataException;
+
+    /**
+     * Tries to restore the stream created by {@link #cloudRead(IFileHandle, 
long, long)}
+     * NOTE: the implementer should not restore the stream using {@link 
ICloudRequest} as this already being
+     * called by {@link ICloudRetry#onRetry()}. If this fails, the whole 
retryable operation should fail
+     *
+     * @param stream to restore
+     */
+    void restoreStream(CloudInputStream stream);

     /**
      * Write to local drive only
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRequest.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRequest.java
new file mode 100644
index 0000000..e7e2af2
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRequest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.request;
+
+import java.io.IOException;
+
+@FunctionalInterface
+public interface ICloudRequest {
+    void call() throws IOException, InterruptedException;
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRetry.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRetry.java
new file mode 100644
index 0000000..e1d640b
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRetry.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.request;
+
+@FunctionalInterface
+public interface ICloudRetry {
+    void onRetry();
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudReturnableRequest.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudReturnableRequest.java
new file mode 100644
index 0000000..8b9247c
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudReturnableRequest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.request;
+
+import java.io.IOException;
+
+@FunctionalInterface
+public interface ICloudReturnableRequest<T> {
+    T call() throws IOException, InterruptedException;
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/stream/CloudInputStream.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/stream/CloudInputStream.java
new file mode 100644
index 0000000..8cbdbd8
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/stream/CloudInputStream.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRetry;
+import org.apache.hyracks.cloud.util.CloudRetryUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public final class CloudInputStream {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final ICloudIOManager cloudIOManager;
+    private final IFileHandle handle;
+    private InputStream in;
+    private long offset;
+    private long remaining;
+
+    public CloudInputStream(ICloudIOManager cloudIOManager, IFileHandle 
handle, InputStream in, long offset,
+            long length) {
+        this.cloudIOManager = cloudIOManager;
+        this.handle = handle;
+        this.in = in;
+        this.offset = offset;
+        this.remaining = length;
+    }
+
+    public String getPath() {
+        return handle.getFileReference().getRelativePath();
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+
+    public long getRemaining() {
+        return remaining;
+    }
+
+    public void advance(int amount) {
+        offset += amount;
+        remaining -= amount;
+    }
+
+    public void read(ByteBuffer buffer) throws HyracksDataException {
+        ICloudRequest read = () -> {
+            while (buffer.remaining() > 0) {
+                int length = in.read(buffer.array(), buffer.position(), 
buffer.remaining());
+                if (length < 0) {
+                    throw new IllegalStateException("Stream should not be 
empty!");
+                }
+                buffer.position(buffer.position() + length);
+            }
+        };
+
+        ICloudRetry retry = () -> {
+            buffer.position(0);
+            cloudIOManager.restoreStream(this);
+        };
+
+        CloudRetryUtil.run(read, retry);
+    }
+
+    public void skip(long n) throws HyracksDataException {
+        ICloudRequest skip = () -> {
+            long remaining = n;
+            while (remaining > 0) {
+                remaining -= in.skip(n);
+            }
+        };
+
+        ICloudRetry retry = () -> {
+            cloudIOManager.restoreStream(this);
+        };
+
+        CloudRetryUtil.run(skip, retry);
+    }
+
+    public void close() throws HyracksDataException {
+        if (remaining != 0) {
+            LOGGER.warn("Closed cloud stream with nonzero bytes = {}", 
remaining);
+        }
+
+        try {
+            in.close();
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public void setInputStream(InputStream in) {
+        this.in = in;
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryUtil.java
new file mode 100644
index 0000000..fa35fc0
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryUtil.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.util;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRetry;
+import org.apache.hyracks.cloud.io.request.ICloudReturnableRequest;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class CloudRetryUtil {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final int NUMBER_OF_RETRIES = 3;
+    private static final long RETRY_WAIT = TimeUnit.SECONDS.toMillis(1);
+    private static final ICloudRetry NO_OP_RETRY = () -> {
+    };
+
+    private CloudRetryUtil() {
+    }
+
+    public static boolean cannotRetry(int attempt) {
+        return attempt > NUMBER_OF_RETRIES;
+    }
+
+    public static int getRemainingAttempts(int attempt) {
+        return NUMBER_OF_RETRIES - attempt;
+    }
+
+    public static void retrySleep(String name) {
+        try {
+            Thread.sleep(RETRY_WAIT);
+        } catch (InterruptedException e) {
+            LOGGER.warn("Interrupted while waiting for {}", name, e);
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    public static <T> T run(ICloudReturnableRequest<T> request) throws 
HyracksDataException {
+        boolean interrupted = Thread.interrupted();
+        try {
+            while (true) {
+                try {
+                    return doRun(request, NO_OP_RETRY);
+                } catch (InterruptedException e) {
+                    interrupted = true;
+                    NO_OP_RETRY.onRetry();
+                }
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    public static <T> T run(ICloudReturnableRequest<T> request, ICloudRetry 
retry) throws HyracksDataException {
+        boolean interrupted = Thread.interrupted();
+        try {
+            while (true) {
+                try {
+                    return doRun(request, retry);
+                } catch (InterruptedException e) {
+                    interrupted = true;
+                    retry.onRetry();
+                }
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    public static void run(ICloudRequest request) throws HyracksDataException {
+        boolean interrupted = Thread.interrupted();
+        try {
+            while (true) {
+                try {
+                    doRun(request, NO_OP_RETRY);
+                    break;
+                } catch (InterruptedException e) {
+                    interrupted = true;
+                    NO_OP_RETRY.onRetry();
+                }
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    public static void run(ICloudRequest request, ICloudRetry retry) throws 
HyracksDataException {
+        boolean interrupted = Thread.interrupted();
+        try {
+            while (true) {
+                try {
+                    doRun(request, retry);
+                    break;
+                } catch (InterruptedException e) {
+                    interrupted = true;
+                    retry.onRetry();
+                }
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    private static <T> T doRun(ICloudReturnableRequest<T> request, ICloudRetry 
retry)
+            throws HyracksDataException, InterruptedException {
+        int attempt = 1;
+        while (true) {
+            try {
+                return request.call();
+            } catch (IOException e) {
+                if (attempt > NUMBER_OF_RETRIES) {
+                    throw HyracksDataException.create(e);
+                }
+                attempt++;
+                retry.onRetry();
+            }
+        }
+    }
+
+    private static void doRun(ICloudRequest request, ICloudRetry retry)
+            throws HyracksDataException, InterruptedException {
+        int attempt = 1;
+        while (true) {
+            try {
+                request.call();
+                break;
+            } catch (IOException e) {
+                if (attempt > NUMBER_OF_RETRIES) {
+                    throw HyracksDataException.create(e);
+                }
+                attempt++;
+                retry.onRetry();
+            }
+        }
+    }
+
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
index 2679d33..c5f538e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
@@ -22,13 +22,13 @@
 import static 
org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider.DEFAULT;

 import java.io.IOException;
-import java.io.InputStream;
 import java.nio.ByteBuffer;

 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import 
org.apache.hyracks.cloud.buffercache.context.BufferCacheCloudReadContextUtil;
 import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
 import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.cloud.io.stream.CloudInputStream;
 import org.apache.hyracks.control.nc.io.IOManager;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
 import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
@@ -51,11 +51,7 @@

     private int numberOfContiguousPages;
     private int pageCounter;
-    private InputStream gapStream;
-
-    // For debugging
-    private long streamOffset;
-    private long remainingStreamBytes;
+    private CloudInputStream gapStream;

     CloudMegaPageReadContext(ColumnProjectorType operation, ColumnRanges 
columnRanges, IPhysicalDrive drive) {
         this.operation = operation;
@@ -75,7 +71,7 @@
         if (cachedPage.skipCloudStream()) {
             /*
              * This page is requested but the buffer cache has a valid copy in 
memory. Also, the page itself was
-             * requested to be read from the cloud. Since this page is valid, 
no buffer cache read() will be performed.
+             gapStream* requested to be read from the cloud. Since this page 
is valid, no buffer cache read() will be performed.
              * As the buffer cache read() is also responsible for persisting 
the bytes read from the cloud, we can end
              * up writing the bytes of this page in the position of another 
page. Therefore, we should skip the bytes
              * for this particular page to avoid placing the bytes of this 
page into another page's position.
@@ -141,10 +137,6 @@

     void close() throws HyracksDataException {
         if (gapStream != null) {
-            if (remainingStreamBytes != 0) {
-                LOGGER.warn("Closed cloud stream with nonzero bytes = {}", 
remainingStreamBytes);
-            }
-
             try {
                 gapStream.close();
                 gapStream = null;
@@ -156,21 +148,12 @@
 
     private void readFromStream(IOManager ioManager, BufferedFileHandle 
fileHandle, BufferCacheHeaderHelper header,
             CachedPage cPage, boolean persist) throws HyracksDataException {
-        InputStream stream = getOrCreateStream(ioManager, fileHandle, cPage);
+        CloudInputStream stream = getOrCreateStream(ioManager, fileHandle, 
cPage);
         ByteBuffer buffer = header.getBuffer();
         buffer.position(0);

-        try {
-            while (buffer.remaining() > 0) {
-                int length = stream.read(buffer.array(), buffer.position(), 
buffer.remaining());
-                if (length < 0) {
-                    throw new IllegalStateException("Stream should not be 
empty!");
-                }
-                buffer.position(buffer.position() + length);
-            }
-        } catch (IOException e) {
-            throw HyracksDataException.create(e);
-        }
+        // Get the page's data from the cloud
+        doStreamRead(stream, buffer);

         // Flip the buffer after reading to restore the correct position
         buffer.flip();
@@ -181,11 +164,10 @@
             BufferCacheCloudReadContextUtil.persist(cloudIOManager, 
fileHandle.getFileHandle(), buffer, offset);
         }

-        streamOffset += cPage.getCompressedPageSize();
-        remainingStreamBytes -= cPage.getCompressedPageSize();
+        gapStream.advance(cPage.getCompressedPageSize());
     }

-    private InputStream getOrCreateStream(IOManager ioManager, 
BufferedFileHandle fileHandle, CachedPage cPage)
+    private CloudInputStream getOrCreateStream(IOManager ioManager, 
BufferedFileHandle fileHandle, CachedPage cPage)
             throws HyracksDataException {
         if (gapStream != null) {
             return gapStream;
@@ -195,12 +177,10 @@
         long offset = cPage.getCompressedPageOffset();
         int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
         long length = fileHandle.getPagesTotalSize(pageId, requiredNumOfPages);
-        remainingStreamBytes = length;
-        streamOffset = offset;
         LOGGER.info(
                 "Cloud stream read for pageId={} starting from pageCounter={} 
out of "
                         + "numberOfContiguousPages={} (streamOffset = {}, 
remainingStreamBytes = {})",
-                pageId, pageCounter, numberOfContiguousPages, streamOffset, 
remainingStreamBytes);
+                pageId, pageCounter, numberOfContiguousPages, offset, length);

         ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
         gapStream = cloudIOManager.cloudRead(fileHandle.getFileHandle(), 
offset, length);
@@ -208,19 +188,32 @@
         return gapStream;
     }

+    private void doStreamRead(CloudInputStream stream, ByteBuffer buffer) 
throws HyracksDataException {
+        try {
+            stream.read(buffer);
+        } catch (Throwable th) {
+            long streamOffset = gapStream.getOffset();
+            long remainingStreamBytes = gapStream.getRemaining();
+            LOGGER.error("Failed to perform stream.read (streamOffset: {}, 
remainingBytes: {}).", streamOffset,
+                    remainingStreamBytes);
+            throw HyracksDataException.create(th);
+        }
+    }
+
     private void skipStreamIfOpened(CachedPage cPage) throws 
HyracksDataException {
         if (gapStream == null) {
             return;
         }

+        long toSkip = cPage.getCompressedPageSize();
         try {
-            long remaining = cPage.getCompressedPageSize();
-            while (remaining > 0) {
-                remaining -= gapStream.skip(remaining);
-            }
-            streamOffset += cPage.getCompressedPageSize();
-            remainingStreamBytes -= cPage.getCompressedPageSize();
+            gapStream.skip(toSkip);
+
         } catch (IOException e) {
+            long streamOffset = gapStream.getOffset();
+            long remainingStreamBytes = gapStream.getRemaining();
+            LOGGER.error("Failed to skip {} bytes (streamOffset: {}, 
remainingBytes: {}).", toSkip, streamOffset,
+                    remainingStreamBytes);
             throw HyracksDataException.create(e);
         }
     }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18378
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ia8c34d4ba7a3527fea22149e5065815095c39ab2
Gerrit-Change-Number: 18378
Gerrit-PatchSet: 1
Gerrit-Owner: Wail Alkowaileet <[email protected]>
Gerrit-MessageType: newchange

Reply via email to