>From Wail Alkowaileet <[email protected]>:
Wail Alkowaileet has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18378 )
Change subject: [NO ISSUE] Make page cloudRead retries on failure
......................................................................
[NO ISSUE] Make page cloudRead retries on failure
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Retry on page cloudRead fails
Change-Id: Ia8c34d4ba7a3527fea22149e5065815095c39ab2
---
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
A
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/stream/CloudInputStream.java
A
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryUtil.java
A
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRetry.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
M
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
A
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudReturnableRequest.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
A
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRequest.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
12 files changed, 457 insertions(+), 72 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/78/18378/1
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 4ca9f59..19d6f2c 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -51,6 +51,11 @@
import org.apache.hyracks.api.io.IIOBulkOperation;
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRetry;
+import org.apache.hyracks.cloud.io.request.ICloudReturnableRequest;
+import org.apache.hyracks.cloud.io.stream.CloudInputStream;
+import org.apache.hyracks.cloud.util.CloudRetryUtil;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -172,12 +177,28 @@
@Override
public final void cloudRead(IFileHandle fHandle, long offset, ByteBuffer
data) throws HyracksDataException {
- cloudClient.read(bucket, fHandle.getFileReference().getRelativePath(),
offset, data);
+ ICloudRequest request =
+ () -> cloudClient.read(bucket,
fHandle.getFileReference().getRelativePath(), offset, data);
+ ICloudRetry retry = () -> data.position(0);
+ CloudRetryUtil.run(request, retry);
}
@Override
- public final InputStream cloudRead(IFileHandle fHandle, long offset, long
length) {
- return cloudClient.getObjectStream(bucket,
fHandle.getFileReference().getRelativePath(), offset, length);
+ public final CloudInputStream cloudRead(IFileHandle fHandle, long offset,
long length) throws HyracksDataException {
+ ICloudReturnableRequest<CloudInputStream> request = () -> {
+ InputStream stream =
+ cloudClient.getObjectStream(bucket,
fHandle.getFileReference().getRelativePath(), offset, length);
+ return new CloudInputStream(this, fHandle, stream, offset, length);
+ };
+
+ return CloudRetryUtil.run(request);
+ }
+
+ @Override
+ public void restoreStream(CloudInputStream cloudStream) {
+ InputStream stream = cloudClient.getObjectStream(bucket,
cloudStream.getPath(), cloudStream.getOffset(),
+ cloudStream.getRemaining());
+ cloudStream.setInputStream(stream);
}
@Override
@@ -279,7 +300,7 @@
// completing the upload.
ICloudWriter cloudWriter = ((CloudFileHandle)
fileHandle).getCloudWriter();
try {
- cloudWriter.finish();
+ CloudRetryUtil.run(cloudWriter::finish);
} catch (HyracksDataException e) {
savedEx = e;
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index 885d612..3819b6e 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -25,6 +25,9 @@
import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
import org.apache.asterix.cloud.clients.ICloudWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRetry;
+import org.apache.hyracks.cloud.util.CloudRetryUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -177,7 +180,9 @@
private void uploadAndWait() throws HyracksDataException {
writeBuffer.flip();
try {
- bufferedWriter.upload(this, writeBuffer.limit());
+ ICloudRequest request = () -> bufferedWriter.upload(this,
writeBuffer.limit());
+ ICloudRetry retry = () -> writeBuffer.position(0);
+ CloudRetryUtil.run(request, retry);
} catch (Exception e) {
LOGGER.error(e);
throw HyracksDataException.create(e);
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 72f3446..be92a68 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -53,6 +53,7 @@
import org.apache.hyracks.api.io.IIOBulkOperation;
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.cloud.util.CloudRetryUtil;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -174,33 +175,33 @@
@Override
public Set<FileReference> list(FileReference dir, FilenameFilter filter)
throws HyracksDataException {
- return accessor.doList(dir, filter);
+ return CloudRetryUtil.run(() -> accessor.doList(dir, filter));
}
@Override
public boolean exists(FileReference fileRef) throws HyracksDataException {
- return accessor.doExists(fileRef);
+ return CloudRetryUtil.run(() -> accessor.doExists(fileRef));
}
@Override
public long getSize(FileReference fileReference) throws
HyracksDataException {
- return accessor.doGetSize(fileReference);
+ return CloudRetryUtil.run(() -> accessor.doGetSize(fileReference));
}
@Override
public byte[] readAllBytes(FileReference fileRef) throws
HyracksDataException {
- return accessor.doReadAllBytes(fileRef);
+ return CloudRetryUtil.run(() -> accessor.doReadAllBytes(fileRef));
}
@Override
public void delete(FileReference fileRef) throws HyracksDataException {
- accessor.doDelete(fileRef);
+ CloudRetryUtil.run(() -> CloudRetryUtil.run(() ->
accessor.doDelete(fileRef)));
log("DELETE", fileRef);
}
@Override
public void overwrite(FileReference fileRef, byte[] bytes) throws
HyracksDataException {
- accessor.doOverwrite(fileRef, bytes);
+ CloudRetryUtil.run(() -> accessor.doOverwrite(fileRef, bytes));
log("WRITE", fileRef);
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
index 8d28d3a..48c3b0f 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
@@ -24,6 +24,7 @@
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.cloud.util.CloudRetryUtil;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.control.nc.io.bulk.DeleteBulkOperation;
import org.apache.logging.log4j.LogManager;
@@ -57,7 +58,7 @@
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Bulk deleting: local: {}, cloud: {}",
fileReferences, paths);
}
- cloudClient.deleteObjects(bucket, paths);
+ CloudRetryUtil.run(() -> cloudClient.deleteObjects(bucket, paths));
// Bulk delete locally as well
super.performOperation();
callBack.call(fileReferences);
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
index 93be80c..a554951 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
@@ -21,12 +21,12 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
import org.apache.asterix.cloud.clients.ICloudGuardian;
import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.util.CloudRetryUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -92,28 +92,7 @@
CompletedMultipartUpload completedMultipartUpload =
CompletedMultipartUpload.builder().parts(partQueue).build();
CompleteMultipartUploadRequest completeMultipartUploadRequest =
CompleteMultipartUploadRequest.builder()
.bucket(bucket).key(path).uploadId(uploadId).multipartUpload(completedMultipartUpload).build();
- int retries = 0;
- while (true) {
- try {
- completeMultipartUpload(completeMultipartUploadRequest);
- break;
- } catch (Exception e) {
- retries++;
- if (retries == MAX_RETRIES) {
- throw HyracksDataException.create(e);
- }
- LOGGER.info(() -> "S3 storage write retry, encountered: " +
e.getMessage());
-
- // Backoff for 1 sec for the first 2 retries, and 2 seconds
from there onward
- try {
- Thread.sleep(TimeUnit.SECONDS.toMillis(retries < 2 ? 1 :
2));
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- throw HyracksDataException.create(ex);
- }
- }
- }
-
+ CloudRetryUtil.run(() ->
completeMultipartUpload(completeMultipartUploadRequest));
log("FINISHED");
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
index 0dca417..c5a5cd6 100644
---
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -18,11 +18,13 @@
*/
package org.apache.hyracks.cloud.io;
-import java.io.InputStream;
import java.nio.ByteBuffer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRetry;
+import org.apache.hyracks.cloud.io.stream.CloudInputStream;
/**
* Certain operations needed to be provided by {@link
org.apache.hyracks.api.io.IIOManager} to support cloud
@@ -45,7 +47,16 @@
* @param offset starting offset
* @return input stream of the required data
*/
- InputStream cloudRead(IFileHandle fHandle, long offset, long length);
+ CloudInputStream cloudRead(IFileHandle fHandle, long offset, long length)
throws HyracksDataException;
+
+ /**
+ * Tries to restore the stream created by {@link #cloudRead(IFileHandle,
long, long)}
+ * NOTE: the implementer should not restore the stream using {@link
ICloudRequest} as this already being
+ * called by {@link ICloudRetry#onRetry()}. If this fails, the whole
retryable operation should fail
+ *
+ * @param stream to restore
+ */
+ void restoreStream(CloudInputStream stream);
/**
* Write to local drive only
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRequest.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRequest.java
new file mode 100644
index 0000000..e7e2af2
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRequest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.request;
+
+import java.io.IOException;
+
+@FunctionalInterface
+public interface ICloudRequest {
+ void call() throws IOException, InterruptedException;
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRetry.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRetry.java
new file mode 100644
index 0000000..e1d640b
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRetry.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.request;
+
+@FunctionalInterface
+public interface ICloudRetry {
+ void onRetry();
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudReturnableRequest.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudReturnableRequest.java
new file mode 100644
index 0000000..8b9247c
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudReturnableRequest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.request;
+
+import java.io.IOException;
+
+@FunctionalInterface
+public interface ICloudReturnableRequest<T> {
+ T call() throws IOException, InterruptedException;
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/stream/CloudInputStream.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/stream/CloudInputStream.java
new file mode 100644
index 0000000..8cbdbd8
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/stream/CloudInputStream.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRetry;
+import org.apache.hyracks.cloud.util.CloudRetryUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public final class CloudInputStream {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final ICloudIOManager cloudIOManager;
+ private final IFileHandle handle;
+ private InputStream in;
+ private long offset;
+ private long remaining;
+
+ public CloudInputStream(ICloudIOManager cloudIOManager, IFileHandle
handle, InputStream in, long offset,
+ long length) {
+ this.cloudIOManager = cloudIOManager;
+ this.handle = handle;
+ this.in = in;
+ this.offset = offset;
+ this.remaining = length;
+ }
+
+ public String getPath() {
+ return handle.getFileReference().getRelativePath();
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public long getRemaining() {
+ return remaining;
+ }
+
+ public void advance(int amount) {
+ offset += amount;
+ remaining -= amount;
+ }
+
+ public void read(ByteBuffer buffer) throws HyracksDataException {
+ ICloudRequest read = () -> {
+ while (buffer.remaining() > 0) {
+ int length = in.read(buffer.array(), buffer.position(),
buffer.remaining());
+ if (length < 0) {
+ throw new IllegalStateException("Stream should not be
empty!");
+ }
+ buffer.position(buffer.position() + length);
+ }
+ };
+
+ ICloudRetry retry = () -> {
+ buffer.position(0);
+ cloudIOManager.restoreStream(this);
+ };
+
+ CloudRetryUtil.run(read, retry);
+ }
+
+ public void skip(long n) throws HyracksDataException {
+ ICloudRequest skip = () -> {
+ long remaining = n;
+ while (remaining > 0) {
+ remaining -= in.skip(n);
+ }
+ };
+
+ ICloudRetry retry = () -> {
+ cloudIOManager.restoreStream(this);
+ };
+
+ CloudRetryUtil.run(skip, retry);
+ }
+
+ public void close() throws HyracksDataException {
+ if (remaining != 0) {
+ LOGGER.warn("Closed cloud stream with nonzero bytes = {}",
remaining);
+ }
+
+ try {
+ in.close();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public void setInputStream(InputStream in) {
+ this.in = in;
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryUtil.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryUtil.java
new file mode 100644
index 0000000..fa35fc0
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryUtil.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.util;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRetry;
+import org.apache.hyracks.cloud.io.request.ICloudReturnableRequest;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class CloudRetryUtil {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final int NUMBER_OF_RETRIES = 3;
+ private static final long RETRY_WAIT = TimeUnit.SECONDS.toMillis(1);
+ private static final ICloudRetry NO_OP_RETRY = () -> {
+ };
+
+ private CloudRetryUtil() {
+ }
+
+ public static boolean cannotRetry(int attempt) {
+ return attempt > NUMBER_OF_RETRIES;
+ }
+
+ public static int getRemainingAttempts(int attempt) {
+ return NUMBER_OF_RETRIES - attempt;
+ }
+
+ public static void retrySleep(String name) {
+ try {
+ Thread.sleep(RETRY_WAIT);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Interrupted while waiting for {}", name, e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public static <T> T run(ICloudReturnableRequest<T> request) throws
HyracksDataException {
+ boolean interrupted = Thread.interrupted();
+ try {
+ while (true) {
+ try {
+ return doRun(request, NO_OP_RETRY);
+ } catch (InterruptedException e) {
+ interrupted = true;
+ NO_OP_RETRY.onRetry();
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public static <T> T run(ICloudReturnableRequest<T> request, ICloudRetry
retry) throws HyracksDataException {
+ boolean interrupted = Thread.interrupted();
+ try {
+ while (true) {
+ try {
+ return doRun(request, retry);
+ } catch (InterruptedException e) {
+ interrupted = true;
+ retry.onRetry();
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public static void run(ICloudRequest request) throws HyracksDataException {
+ boolean interrupted = Thread.interrupted();
+ try {
+ while (true) {
+ try {
+ doRun(request, NO_OP_RETRY);
+ break;
+ } catch (InterruptedException e) {
+ interrupted = true;
+ NO_OP_RETRY.onRetry();
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public static void run(ICloudRequest request, ICloudRetry retry) throws
HyracksDataException {
+ boolean interrupted = Thread.interrupted();
+ try {
+ while (true) {
+ try {
+ doRun(request, retry);
+ break;
+ } catch (InterruptedException e) {
+ interrupted = true;
+ retry.onRetry();
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private static <T> T doRun(ICloudReturnableRequest<T> request, ICloudRetry
retry)
+ throws HyracksDataException, InterruptedException {
+ int attempt = 1;
+ while (true) {
+ try {
+ return request.call();
+ } catch (IOException e) {
+ if (attempt > NUMBER_OF_RETRIES) {
+ throw HyracksDataException.create(e);
+ }
+ attempt++;
+ retry.onRetry();
+ }
+ }
+ }
+
+ private static void doRun(ICloudRequest request, ICloudRetry retry)
+ throws HyracksDataException, InterruptedException {
+ int attempt = 1;
+ while (true) {
+ try {
+ request.call();
+ break;
+ } catch (IOException e) {
+ if (attempt > NUMBER_OF_RETRIES) {
+ throw HyracksDataException.create(e);
+ }
+ attempt++;
+ retry.onRetry();
+ }
+ }
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
index 2679d33..c5f538e 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
@@ -22,13 +22,13 @@
import static
org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider.DEFAULT;
import java.io.IOException;
-import java.io.InputStream;
import java.nio.ByteBuffer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import
org.apache.hyracks.cloud.buffercache.context.BufferCacheCloudReadContextUtil;
import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.cloud.io.stream.CloudInputStream;
import org.apache.hyracks.control.nc.io.IOManager;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
@@ -51,11 +51,7 @@
private int numberOfContiguousPages;
private int pageCounter;
- private InputStream gapStream;
-
- // For debugging
- private long streamOffset;
- private long remainingStreamBytes;
+ private CloudInputStream gapStream;
CloudMegaPageReadContext(ColumnProjectorType operation, ColumnRanges
columnRanges, IPhysicalDrive drive) {
this.operation = operation;
@@ -75,7 +71,7 @@
if (cachedPage.skipCloudStream()) {
/*
* This page is requested but the buffer cache has a valid copy in
memory. Also, the page itself was
- * requested to be read from the cloud. Since this page is valid,
no buffer cache read() will be performed.
+ gapStream* requested to be read from the cloud. Since this page
is valid, no buffer cache read() will be performed.
* As the buffer cache read() is also responsible for persisting
the bytes read from the cloud, we can end
* up writing the bytes of this page in the position of another
page. Therefore, we should skip the bytes
* for this particular page to avoid placing the bytes of this
page into another page's position.
@@ -141,10 +137,6 @@
void close() throws HyracksDataException {
if (gapStream != null) {
- if (remainingStreamBytes != 0) {
- LOGGER.warn("Closed cloud stream with nonzero bytes = {}",
remainingStreamBytes);
- }
-
try {
gapStream.close();
gapStream = null;
@@ -156,21 +148,12 @@
private void readFromStream(IOManager ioManager, BufferedFileHandle
fileHandle, BufferCacheHeaderHelper header,
CachedPage cPage, boolean persist) throws HyracksDataException {
- InputStream stream = getOrCreateStream(ioManager, fileHandle, cPage);
+ CloudInputStream stream = getOrCreateStream(ioManager, fileHandle,
cPage);
ByteBuffer buffer = header.getBuffer();
buffer.position(0);
- try {
- while (buffer.remaining() > 0) {
- int length = stream.read(buffer.array(), buffer.position(),
buffer.remaining());
- if (length < 0) {
- throw new IllegalStateException("Stream should not be
empty!");
- }
- buffer.position(buffer.position() + length);
- }
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
+ // Get the page's data from the cloud
+ doStreamRead(stream, buffer);
// Flip the buffer after reading to restore the correct position
buffer.flip();
@@ -181,11 +164,10 @@
BufferCacheCloudReadContextUtil.persist(cloudIOManager,
fileHandle.getFileHandle(), buffer, offset);
}
- streamOffset += cPage.getCompressedPageSize();
- remainingStreamBytes -= cPage.getCompressedPageSize();
+ gapStream.advance(cPage.getCompressedPageSize());
}
- private InputStream getOrCreateStream(IOManager ioManager,
BufferedFileHandle fileHandle, CachedPage cPage)
+ private CloudInputStream getOrCreateStream(IOManager ioManager,
BufferedFileHandle fileHandle, CachedPage cPage)
throws HyracksDataException {
if (gapStream != null) {
return gapStream;
@@ -195,12 +177,10 @@
long offset = cPage.getCompressedPageOffset();
int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
long length = fileHandle.getPagesTotalSize(pageId, requiredNumOfPages);
- remainingStreamBytes = length;
- streamOffset = offset;
LOGGER.info(
"Cloud stream read for pageId={} starting from pageCounter={}
out of "
+ "numberOfContiguousPages={} (streamOffset = {},
remainingStreamBytes = {})",
- pageId, pageCounter, numberOfContiguousPages, streamOffset,
remainingStreamBytes);
+ pageId, pageCounter, numberOfContiguousPages, offset, length);
ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
gapStream = cloudIOManager.cloudRead(fileHandle.getFileHandle(),
offset, length);
@@ -208,19 +188,32 @@
return gapStream;
}
+ private void doStreamRead(CloudInputStream stream, ByteBuffer buffer)
throws HyracksDataException {
+ try {
+ stream.read(buffer);
+ } catch (Throwable th) {
+ long streamOffset = gapStream.getOffset();
+ long remainingStreamBytes = gapStream.getRemaining();
+ LOGGER.error("Failed to perform stream.read (streamOffset: {},
remainingBytes: {}).", streamOffset,
+ remainingStreamBytes);
+ throw HyracksDataException.create(th);
+ }
+ }
+
private void skipStreamIfOpened(CachedPage cPage) throws
HyracksDataException {
if (gapStream == null) {
return;
}
+ long toSkip = cPage.getCompressedPageSize();
try {
- long remaining = cPage.getCompressedPageSize();
- while (remaining > 0) {
- remaining -= gapStream.skip(remaining);
- }
- streamOffset += cPage.getCompressedPageSize();
- remainingStreamBytes -= cPage.getCompressedPageSize();
+ gapStream.skip(toSkip);
+
} catch (IOException e) {
+ long streamOffset = gapStream.getOffset();
+ long remainingStreamBytes = gapStream.getRemaining();
+ LOGGER.error("Failed to skip {} bytes (streamOffset: {},
remainingBytes: {}).", toSkip, streamOffset,
+ remainingStreamBytes);
throw HyracksDataException.create(e);
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18378
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ia8c34d4ba7a3527fea22149e5065815095c39ab2
Gerrit-Change-Number: 18378
Gerrit-PatchSet: 1
Gerrit-Owner: Wail Alkowaileet <[email protected]>
Gerrit-MessageType: newchange