>From Michael Blow <[email protected]>:
Michael Blow has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19464 )
Change subject: [NO ISSUE][*DB][CLOUD] Ensure interrupts are not lost when
performning GCS I/O
......................................................................
[NO ISSUE][*DB][CLOUD] Ensure interrupts are not lost when performning GCS I/O
Ext-ref: MB-65432
Change-Id: Iab4b86fe3966a2d0b509c9bd871f792c3e35ff23
---
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
M
asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
15 files changed, 166 insertions(+), 68 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/64/19464/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 6928b64..5eafe05 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -191,7 +191,7 @@
if (cloudDeployment) {
cloudProperties = new
CloudProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
ioManager = CloudConfigurator.createIOManager(ioManager,
cloudProperties, namespacePathResolver,
- getCloudGuardian(cloudProperties));
+ getCloudGuardian(cloudProperties),
controllerService.getExecutor());
}
IGlobalTxManager globalTxManager = createGlobalTxManager(ioManager);
appCtx = createApplicationContext(null, globalRecoveryManager,
lifecycleCoordinator, Receptionist::new,
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 912ca47..ed63e63 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -31,6 +31,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.function.Predicate;
import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
@@ -84,12 +85,13 @@
private final List<FileStore> drivePaths;
public AbstractCloudIOManager(IOManager ioManager, CloudProperties
cloudProperties,
- INamespacePathResolver nsPathResolver, ICloudGuardian guardian)
throws HyracksDataException {
+ INamespacePathResolver nsPathResolver, ICloudGuardian guardian,
ExecutorService executor)
+ throws HyracksDataException {
super(ioManager.getIODevices(), ioManager.getDeviceComputer(),
ioManager.getIOParallelism(),
ioManager.getQueueSize());
this.nsPathResolver = nsPathResolver;
this.bucket = cloudProperties.getStorageBucket();
- cloudClient = CloudClientProvider.getClient(cloudProperties, guardian);
+ cloudClient = CloudClientProvider.getClient(cloudProperties, guardian,
executor);
this.guardian = guardian;
int numOfThreads = getIODevices().size() * getIOParallelism();
writeBufferProvider = new WriteBufferProvider(numOfThreads,
cloudClient.getWriteBufferSize());
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
index dced5b0..cb69ddf 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
@@ -74,7 +74,7 @@
this.cloudProperties = cloudProperties;
localIoManager = (IOManager) ioManager;
diskCacheManagerRequired = cloudProperties.getCloudCachePolicy() ==
CloudCachePolicy.SELECTIVE;
- cloudIOManager = createIOManager(ioManager, cloudProperties,
nsPathResolver, guardian);
+ cloudIOManager = createIOManager(ioManager, cloudProperties,
nsPathResolver, guardian, executor);
physicalDrive = createPhysicalDrive(diskCacheManagerRequired,
cloudProperties, ioManager);
lockNotifier = createLockNotifier(diskCacheManagerRequired);
pageAllocator = createPageAllocator(diskCacheManagerRequired);
@@ -136,15 +136,16 @@
}
public static AbstractCloudIOManager createIOManager(IIOManager ioManager,
CloudProperties cloudProperties,
- INamespacePathResolver nsPathResolver, ICloudGuardian guardian)
throws HyracksDataException {
+ INamespacePathResolver nsPathResolver, ICloudGuardian guardian,
ExecutorService executor)
+ throws HyracksDataException {
IOManager localIoManager = (IOManager) ioManager;
CloudCachePolicy policy = cloudProperties.getCloudCachePolicy();
if (policy == CloudCachePolicy.EAGER) {
- return new EagerCloudIOManager(localIoManager, cloudProperties,
nsPathResolver, guardian);
+ return new EagerCloudIOManager(localIoManager, cloudProperties,
nsPathResolver, guardian, executor);
}
boolean selective = policy == CloudCachePolicy.SELECTIVE;
- return new LazyCloudIOManager(localIoManager, cloudProperties,
nsPathResolver, selective, guardian);
+ return new LazyCloudIOManager(localIoManager, cloudProperties,
nsPathResolver, selective, guardian, executor);
}
private static IPhysicalDrive createPhysicalDrive(boolean
diskCacheManagerRequired, CloudProperties cloudProperties,
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
index b49f3ce..b944161 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.cloud;
+import java.util.concurrent.ExecutorService;
+
import org.apache.asterix.cloud.clients.ICloudGuardian;
import org.apache.asterix.common.api.INamespacePathResolver;
import org.apache.asterix.common.cloud.CloudCachePolicy;
@@ -32,13 +34,14 @@
}
public static IIOManager createIOManager(CloudProperties cloudProperties,
IIOManager ioManager,
- INamespacePathResolver nsPathResolver, ICloudGuardian guardian)
throws HyracksDataException {
+ INamespacePathResolver nsPathResolver, ICloudGuardian guardian,
ExecutorService executor)
+ throws HyracksDataException {
IOManager localIoManager = (IOManager) ioManager;
if (cloudProperties.getCloudCachePolicy() == CloudCachePolicy.LAZY) {
- return new LazyCloudIOManager(localIoManager, cloudProperties,
nsPathResolver, false, guardian);
+ return new LazyCloudIOManager(localIoManager, cloudProperties,
nsPathResolver, false, guardian, executor);
}
- return new EagerCloudIOManager(localIoManager, cloudProperties,
nsPathResolver, guardian);
+ return new EagerCloudIOManager(localIoManager, cloudProperties,
nsPathResolver, guardian, executor);
}
public static IPartitionBootstrapper
getCloudPartitionBootstrapper(IIOManager ioManager) {
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index 1cb6077..29e5da0 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -23,6 +23,7 @@
import java.io.File;
import java.util.Collections;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.asterix.cloud.clients.ICloudGuardian;
@@ -48,8 +49,9 @@
private static final Logger LOGGER = LogManager.getLogger();
public EagerCloudIOManager(IOManager ioManager, CloudProperties
cloudProperties,
- INamespacePathResolver nsPathResolver, ICloudGuardian guardian)
throws HyracksDataException {
- super(ioManager, cloudProperties, nsPathResolver, guardian);
+ INamespacePathResolver nsPathResolver, ICloudGuardian guardian,
ExecutorService executor)
+ throws HyracksDataException {
+ super(ioManager, cloudProperties, nsPathResolver, guardian, executor);
}
/*
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 1c5efd9..7a7ea06 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -29,6 +29,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
@@ -70,9 +71,9 @@
private ILazyAccessor accessor;
public LazyCloudIOManager(IOManager ioManager, CloudProperties
cloudProperties,
- INamespacePathResolver nsPathResolver, boolean selective,
ICloudGuardian guardian)
+ INamespacePathResolver nsPathResolver, boolean selective,
ICloudGuardian guardian, ExecutorService executor)
throws HyracksDataException {
- super(ioManager, cloudProperties, nsPathResolver, guardian);
+ super(ioManager, cloudProperties, nsPathResolver, guardian, executor);
accessor = new InitialCloudAccessor(cloudClient, bucket,
localIoManager);
puncher = HolePuncherProvider.get(this, cloudProperties,
writeBufferProvider);
if (selective) {
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
index c98c6b4..a0c8bd0 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.cloud.clients;
+import java.util.concurrent.ExecutorService;
+
import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
import
org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig;
@@ -38,8 +40,8 @@
throw new AssertionError("do not instantiate");
}
- public static ICloudClient getClient(CloudProperties cloudProperties,
ICloudGuardian guardian)
- throws HyracksDataException {
+ public static ICloudClient getClient(CloudProperties cloudProperties,
ICloudGuardian guardian,
+ ExecutorService executor) throws HyracksDataException {
String storageScheme = cloudProperties.getStorageScheme();
ICloudClient cloudClient;
if (S3.equalsIgnoreCase(storageScheme)) {
@@ -47,7 +49,7 @@
cloudClient = new S3CloudClient(config, guardian);
} else if (GCS.equalsIgnoreCase(storageScheme)) {
GCSClientConfig config = GCSClientConfig.of(cloudProperties);
- cloudClient = new GCSCloudClient(config, guardian);
+ cloudClient = new GCSCloudClient(config, guardian, executor);
} else if (AZ_BLOB.equalsIgnoreCase(storageScheme)) {
AzBlobStorageClientConfig config =
AzBlobStorageClientConfig.of(cloudProperties);
cloudClient = new AzBlobStorageCloudClient(config, guardian);
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index fd82944..d332944 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -65,7 +65,7 @@
* @param filter filter to apply
* @return file names returned after applying the file name filter
*/
- Set<CloudFile> listObjects(String bucket, String path, FilenameFilter
filter);
+ Set<CloudFile> listObjects(String bucket, String path, FilenameFilter
filter) throws HyracksDataException;
/**
* Performs a range-read from the specified bucket and path starting at
the offset. The amount read is equal to the
@@ -107,7 +107,7 @@
* @param path path
* @param data data
*/
- void write(String bucket, String path, byte[] data);
+ void write(String bucket, String path, byte[] data) throws
HyracksDataException;
/**
* Copies an object from the source path to the destination path
@@ -116,7 +116,7 @@
* @param srcPath source path
* @param destPath destination path
*/
- void copy(String bucket, String srcPath, FileReference destPath);
+ void copy(String bucket, String srcPath, FileReference destPath) throws
HyracksDataException;
/**
* Deletes all objects at the specified bucket and paths
@@ -162,7 +162,7 @@
* @param bucket bucket name
* @return {@link JsonNode} with stored objects' information
*/
- JsonNode listAsJson(ObjectMapper objectMapper, String bucket);
+ JsonNode listAsJson(ObjectMapper objectMapper, String bucket) throws
HyracksDataException;
/**
* Performs any necessary closing and cleaning up
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index 924f34a..11728dc 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -31,6 +31,9 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
import org.apache.asterix.cloud.IWriteBufferProvider;
import org.apache.asterix.cloud.clients.CloudFile;
@@ -76,12 +79,15 @@
private final ICloudGuardian guardian;
private final IRequestProfilerLimiter profilerLimiter;
private final int writeBufferSize;
+ private final ExecutorService executor;
- public GCSCloudClient(GCSClientConfig config, Storage gcsClient,
ICloudGuardian guardian) {
+ public GCSCloudClient(GCSClientConfig config, Storage gcsClient,
ICloudGuardian guardian,
+ ExecutorService executor) {
this.gcsClient = gcsClient;
this.config = config;
this.guardian = guardian;
this.writeBufferSize = config.getWriteBufferSize();
+ this.executor = executor;
long profilerInterval = config.getProfilerLogInterval();
GCSRequestRateLimiter limiter = new GCSRequestRateLimiter(config);
if (profilerInterval > 0) {
@@ -92,8 +98,9 @@
guardian.setCloudClient(this);
}
- public GCSCloudClient(GCSClientConfig config, ICloudGuardian guardian)
throws HyracksDataException {
- this(config, buildClient(config), guardian);
+ public GCSCloudClient(GCSClientConfig config, ICloudGuardian guardian,
ExecutorService executor)
+ throws HyracksDataException {
+ this(config, buildClient(config), guardian, executor);
}
@Override
@@ -112,12 +119,12 @@
}
@Override
- public Set<CloudFile> listObjects(String bucket, String path,
FilenameFilter filter) {
+ public Set<CloudFile> listObjects(String bucket, String path,
FilenameFilter filter) throws HyracksDataException {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectsList();
- Page<Blob> blobs = gcsClient.list(bucket,
BlobListOption.prefix(config.getPrefix() + path),
- BlobListOption.fields(Storage.BlobField.SIZE));
-
+ // MB-65432: Storage.list is not interrupt-safe; we need to offload
onto another thread
+ Page<Blob> blobs = runOpInterruptibly(() -> gcsClient.list(bucket,
+ BlobListOption.prefix(config.getPrefix() + path),
BlobListOption.fields(Storage.BlobField.SIZE)));
Set<CloudFile> files = new HashSet<>();
for (Blob blob : blobs.iterateAll()) {
if (filter.accept(null,
IoUtil.getFileNameFromPath(blob.getName()))) {
@@ -150,14 +157,18 @@
}
@Override
- public byte[] readAllBytes(String bucket, String path) {
+ public byte[] readAllBytes(String bucket, String path) throws
HyracksDataException {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
BlobId blobId = BlobId.of(bucket, config.getPrefix() + path);
try {
- return gcsClient.readAllBytes(blobId);
+ // MB-65432: Storage.readAllBytes is not interrupt-safe; we need
to offload onto another thread
+ return runOpInterruptibly(() -> gcsClient.readAllBytes(blobId));
} catch (StorageException e) {
- return null;
+ if (e.getCode() == 404) {
+ return null;
+ }
+ throw HyracksDataException.create(e);
}
}
@@ -176,27 +187,32 @@
}
@Override
- public void write(String bucket, String path, byte[] data) {
+ public void write(String bucket, String path, byte[] data) throws
HyracksDataException {
guardian.checkWriteAccess(bucket, path);
profilerLimiter.objectWrite();
BlobInfo blobInfo = BlobInfo.newBuilder(bucket, config.getPrefix() +
path).build();
- gcsClient.create(blobInfo, data);
+ // MB-65432: Storage.create is not interrupt-safe; we need to offload
onto another thread
+ runOpInterruptibly(() -> gcsClient.create(blobInfo, data));
}
@Override
- public void copy(String bucket, String srcPath, FileReference destPath) {
+ public void copy(String bucket, String srcPath, FileReference destPath)
throws HyracksDataException {
guardian.checkReadAccess(bucket, srcPath);
profilerLimiter.objectsList();
- Page<Blob> blobs = gcsClient.list(bucket,
BlobListOption.prefix(config.getPrefix() + srcPath));
- for (Blob blob : blobs.iterateAll()) {
- profilerLimiter.objectCopy();
- BlobId source = blob.getBlobId();
- String targetName =
destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName()));
- BlobId target = BlobId.of(bucket, targetName);
- guardian.checkWriteAccess(bucket, targetName);
- CopyRequest copyReq =
CopyRequest.newBuilder().setSource(source).setTarget(target).build();
- gcsClient.copy(copyReq);
- }
+ // MB-65432: Storage.list & copy are not interrupt-safe; we need to
offload onto another thread
+ runOpInterruptibly(() -> {
+ Page<Blob> blobs = gcsClient.list(bucket,
BlobListOption.prefix(config.getPrefix() + srcPath));
+ for (Blob blob : blobs.iterateAll()) {
+ profilerLimiter.objectCopy();
+ BlobId source = blob.getBlobId();
+ String targetName =
destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName()));
+ BlobId target = BlobId.of(bucket, targetName);
+ guardian.checkWriteAccess(bucket, targetName);
+ CopyRequest copyReq =
CopyRequest.newBuilder().setSource(source).setTarget(target).build();
+ gcsClient.copy(copyReq);
+ }
+ return null;
+ });
}
@Override
@@ -206,17 +222,16 @@
}
List<StorageBatchResult<Boolean>> deleteResponses = new ArrayList<>();
- StorageBatch batchRequest;
Iterator<String> pathIter = paths.iterator();
while (pathIter.hasNext()) {
- batchRequest = gcsClient.batch();
+ StorageBatch batchRequest = gcsClient.batch();
for (int i = 0; pathIter.hasNext() && i < DELETE_BATCH_SIZE; i++) {
BlobId blobId = BlobId.of(bucket, config.getPrefix() +
pathIter.next());
guardian.checkWriteAccess(bucket, blobId.getName());
deleteResponses.add(batchRequest.delete(blobId));
}
-
- batchRequest.submit();
+ // MB-65432: StorageBatch.submit may not be interrupt-safe; we
need to offload onto another thread
+ runOpInterruptibly(batchRequest::submit);
Iterator<String> deletePathIter = paths.iterator();
for (StorageBatchResult<Boolean> deleteResponse : deleteResponses)
{
String deletedPath = deletePathIter.next();
@@ -240,11 +255,12 @@
}
@Override
- public long getObjectSize(String bucket, String path) {
+ public long getObjectSize(String bucket, String path) throws
HyracksDataException {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
- Blob blob =
- gcsClient.get(bucket, config.getPrefix() + path,
Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
+ // MB-65432: Storage.get is not interrupt-safe; we need to offload
onto another thread
+ Blob blob = runOpInterruptibly(() -> gcsClient.get(bucket,
config.getPrefix() + path,
+ Storage.BlobGetOption.fields(Storage.BlobField.SIZE)));
if (blob == null) {
return 0;
}
@@ -252,19 +268,22 @@
}
@Override
- public boolean exists(String bucket, String path) {
+ public boolean exists(String bucket, String path) throws
HyracksDataException {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
- Blob blob = gcsClient.get(bucket, config.getPrefix() + path,
- Storage.BlobGetOption.fields(Storage.BlobField.values()));
+ // MB-65432: Storage.get is not interrupt-safe; we need to offload
onto another thread
+ Blob blob = runOpInterruptibly(() -> gcsClient.get(bucket,
config.getPrefix() + path,
+ Storage.BlobGetOption.fields(Storage.BlobField.values())));
return blob != null && blob.exists();
}
@Override
- public boolean isEmptyPrefix(String bucket, String path) {
+ public boolean isEmptyPrefix(String bucket, String path) throws
HyracksDataException {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectsList();
- Page<Blob> blobs = gcsClient.list(bucket,
BlobListOption.prefix(config.getPrefix() + path));
+ // MB-65432: Storage.list is not interrupt-safe; we need to offload
onto another thread
+ Page<Blob> blobs =
+ runOpInterruptibly(() -> gcsClient.list(bucket,
BlobListOption.prefix(config.getPrefix() + path)));
return !blobs.iterateAll().iterator().hasNext();
}
@@ -275,10 +294,11 @@
}
@Override
- public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
+ public JsonNode listAsJson(ObjectMapper objectMapper, String bucket)
throws HyracksDataException {
guardian.checkReadAccess(bucket, "/");
profilerLimiter.objectsList();
- Page<Blob> blobs = gcsClient.list(bucket,
BlobListOption.fields(Storage.BlobField.SIZE));
+ Page<Blob> blobs =
+ runOpInterruptibly(() -> gcsClient.list(bucket,
BlobListOption.fields(Storage.BlobField.SIZE)));
ArrayNode objectsInfo = objectMapper.createArrayNode();
List<Blob> objects = new ArrayList<>();
@@ -313,4 +333,24 @@
private String stripCloudPrefix(String objectName) {
return objectName.substring(config.getPrefix().length());
}
+
+ private void runOpInterruptibly(Runnable operation) throws
HyracksDataException {
+ try {
+ executor.submit(operation).get();
+ } catch (InterruptedException e) {
+ throw HyracksDataException.create(e);
+ } catch (ExecutionException e) {
+ throw HyracksDataException.create(e.getCause());
+ }
+ }
+
+ private <T> T runOpInterruptibly(Supplier<T> operation) throws
HyracksDataException {
+ try {
+ return executor.submit(operation::get).get();
+ } catch (InterruptedException e) {
+ throw HyracksDataException.create(e);
+ } catch (ExecutionException e) {
+ throw HyracksDataException.create(e.getCause());
+ }
+ }
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
index 0d30120..cbbaf31 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
@@ -33,6 +33,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.control.nc.io.IOManager;
import com.google.api.gax.paging.Page;
@@ -50,7 +51,6 @@
public class GCSParallelDownloader implements IParallelDownloader {
- // private static final Logger LOGGER = LogManager.getLogger();
private final String bucket;
private final IOManager ioManager;
private final Storage gcsClient;
@@ -95,6 +95,7 @@
downloadJobs.add(transferManager.downloadBlobs(entry.getValue(),
downConfig.setDownloadDirectory(entry.getKey()).build()));
}
+ // MB-65432: DownloadJob.getDownloadResults is interrupt-safe; no need
to offload
downloadJobs.forEach(DownloadJob::getDownloadResults);
}
@@ -121,6 +122,7 @@
}
List<DownloadResult> results;
for (DownloadJob job : downloadJobs) {
+ // MB-65432: DownloadJob.getDownloadResults is interrupt-safe; no
need to offload
results = job.getDownloadResults();
for (DownloadResult result : results) {
if (result.getStatus() != TransferStatus.SUCCESS) {
@@ -134,12 +136,7 @@
@Override
public void close() throws HyracksDataException {
- try {
- transferManager.close();
- gcsClient.close();
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
+ InvokeUtil.tryWithCleanupsAsHyracks(transferManager::close,
gcsClient::close);
}
private <K, V> void addToMap(Map<K, List<V>> map, K key, V value) {
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
index 89d8fd5..f240ad4 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
@@ -108,9 +108,10 @@
profiler.objectMultipartUpload();
try {
writer.close();
- writer = null;
} catch (IOException | RuntimeException e) {
throw HyracksDataException.create(e);
+ } finally {
+ writer = null;
}
log("FINISHED");
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
index 63a8366..0aa1f87 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
@@ -65,7 +65,8 @@
ICloudClient createCloudClient(IApplicationContext appCtx) throws
CompilationException {
GCSClientConfig config = GCSClientConfig.of(configuration,
writeBufferSize);
return new GCSCloudClient(config, GCSUtils.buildClient(configuration),
- ICloudGuardian.NoOpCloudGuardian.INSTANCE);
+ ICloudGuardian.NoOpCloudGuardian.INSTANCE,
+
appCtx.getServiceContext().getControllerService().getExecutor());
}
@Override
diff --git
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
index d89c872..2f64c37 100644
---
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
+++
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.cloud.gcs;
+import java.util.concurrent.Executors;
+
import org.apache.asterix.cloud.AbstractLSMTest;
import org.apache.asterix.cloud.clients.ICloudGuardian;
import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
@@ -52,7 +54,8 @@
int writeBufferSize = StorageUtil.getIntSizeInBytes(5,
StorageUtil.StorageUnit.MEGABYTE);
GCSClientConfig config =
new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME,
true, 0, writeBufferSize, "");
- CLOUD_CLIENT = new GCSCloudClient(config,
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
+ CLOUD_CLIENT =
+ new GCSCloudClient(config,
ICloudGuardian.NoOpCloudGuardian.INSTANCE, Executors.newCachedThreadPool());
}
private static void cleanup() {
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index fdda793..46c8ec2 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -45,6 +45,8 @@
return (HyracksDataException) cause;
} else if (cause instanceof InterruptedException) {
Thread.currentThread().interrupt();
+ } else if (cause instanceof Error) {
+ throw (Error) cause;
}
return new HyracksDataException(cause);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index e7970d3..ff0a3c5 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -216,6 +216,39 @@
@SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions",
"UnreachableCode" })
// catching Throwable, instanceofs, false-positive unreachable code
+ public static void tryWithCleanupsAsHyracks(ThrowingAction action,
ThrowingAction... cleanups)
+ throws HyracksDataException {
+ Throwable savedT = null;
+ boolean suppressedInterrupted = false;
+ try {
+ action.run();
+ } catch (Throwable t) {
+ savedT = t;
+ } finally {
+ for (ThrowingAction cleanup : cleanups) {
+ try {
+ cleanup.run();
+ } catch (Throwable t) {
+ if (savedT != null) {
+ savedT.addSuppressed(t);
+ suppressedInterrupted |= t instanceof
InterruptedException;
+ } else {
+ savedT = t;
+ }
+ }
+ }
+ }
+ if (savedT == null) {
+ return;
+ }
+ if (suppressedInterrupted) {
+ Thread.currentThread().interrupt();
+ }
+ throw HyracksDataException.create(savedT);
+ }
+
+ @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions",
"UnreachableCode" })
+ // catching Throwable, instanceofs, false-positive unreachable code
public static void tryWithCleanups(ThrowingAction action,
ThrowingAction... cleanups) throws Exception {
Throwable savedT = null;
boolean suppressedInterrupted = false;
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19464
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: ionic
Gerrit-Change-Id: Iab4b86fe3966a2d0b509c9bd871f792c3e35ff23
Gerrit-Change-Number: 19464
Gerrit-PatchSet: 1
Gerrit-Owner: Michael Blow <[email protected]>
Gerrit-MessageType: newchange