>From Michael Blow <[email protected]>:

Michael Blow has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19464 )


Change subject: [NO ISSUE][*DB][CLOUD] Ensure interrupts are not lost when 
performning GCS I/O
......................................................................

[NO ISSUE][*DB][CLOUD] Ensure interrupts are not lost when performning GCS I/O

Ext-ref: MB-65432
Change-Id: Iab4b86fe3966a2d0b509c9bd871f792c3e35ff23
---
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
M 
asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
15 files changed, 166 insertions(+), 68 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/64/19464/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 6928b64..5eafe05 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -191,7 +191,7 @@
         if (cloudDeployment) {
             cloudProperties = new 
CloudProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
             ioManager = CloudConfigurator.createIOManager(ioManager, 
cloudProperties, namespacePathResolver,
-                    getCloudGuardian(cloudProperties));
+                    getCloudGuardian(cloudProperties), 
controllerService.getExecutor());
         }
         IGlobalTxManager globalTxManager = createGlobalTxManager(ioManager);
         appCtx = createApplicationContext(null, globalRecoveryManager, 
lifecycleCoordinator, Receptionist::new,
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 912ca47..ed63e63 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -31,6 +31,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.function.Predicate;

 import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
@@ -84,12 +85,13 @@
     private final List<FileStore> drivePaths;

     public AbstractCloudIOManager(IOManager ioManager, CloudProperties 
cloudProperties,
-            INamespacePathResolver nsPathResolver, ICloudGuardian guardian) 
throws HyracksDataException {
+            INamespacePathResolver nsPathResolver, ICloudGuardian guardian, 
ExecutorService executor)
+            throws HyracksDataException {
         super(ioManager.getIODevices(), ioManager.getDeviceComputer(), 
ioManager.getIOParallelism(),
                 ioManager.getQueueSize());
         this.nsPathResolver = nsPathResolver;
         this.bucket = cloudProperties.getStorageBucket();
-        cloudClient = CloudClientProvider.getClient(cloudProperties, guardian);
+        cloudClient = CloudClientProvider.getClient(cloudProperties, guardian, 
executor);
         this.guardian = guardian;
         int numOfThreads = getIODevices().size() * getIOParallelism();
         writeBufferProvider = new WriteBufferProvider(numOfThreads, 
cloudClient.getWriteBufferSize());
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
index dced5b0..cb69ddf 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
@@ -74,7 +74,7 @@
         this.cloudProperties = cloudProperties;
         localIoManager = (IOManager) ioManager;
         diskCacheManagerRequired = cloudProperties.getCloudCachePolicy() == 
CloudCachePolicy.SELECTIVE;
-        cloudIOManager = createIOManager(ioManager, cloudProperties, 
nsPathResolver, guardian);
+        cloudIOManager = createIOManager(ioManager, cloudProperties, 
nsPathResolver, guardian, executor);
         physicalDrive = createPhysicalDrive(diskCacheManagerRequired, 
cloudProperties, ioManager);
         lockNotifier = createLockNotifier(diskCacheManagerRequired);
         pageAllocator = createPageAllocator(diskCacheManagerRequired);
@@ -136,15 +136,16 @@
     }

     public static AbstractCloudIOManager createIOManager(IIOManager ioManager, 
CloudProperties cloudProperties,
-            INamespacePathResolver nsPathResolver, ICloudGuardian guardian) 
throws HyracksDataException {
+            INamespacePathResolver nsPathResolver, ICloudGuardian guardian, 
ExecutorService executor)
+            throws HyracksDataException {
         IOManager localIoManager = (IOManager) ioManager;
         CloudCachePolicy policy = cloudProperties.getCloudCachePolicy();
         if (policy == CloudCachePolicy.EAGER) {
-            return new EagerCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver, guardian);
+            return new EagerCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver, guardian, executor);
         }

         boolean selective = policy == CloudCachePolicy.SELECTIVE;
-        return new LazyCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver, selective, guardian);
+        return new LazyCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver, selective, guardian, executor);
     }

     private static IPhysicalDrive createPhysicalDrive(boolean 
diskCacheManagerRequired, CloudProperties cloudProperties,
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
index b49f3ce..b944161 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.cloud;

+import java.util.concurrent.ExecutorService;
+
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.common.api.INamespacePathResolver;
 import org.apache.asterix.common.cloud.CloudCachePolicy;
@@ -32,13 +34,14 @@
     }

     public static IIOManager createIOManager(CloudProperties cloudProperties, 
IIOManager ioManager,
-            INamespacePathResolver nsPathResolver, ICloudGuardian guardian) 
throws HyracksDataException {
+            INamespacePathResolver nsPathResolver, ICloudGuardian guardian, 
ExecutorService executor)
+            throws HyracksDataException {
         IOManager localIoManager = (IOManager) ioManager;
         if (cloudProperties.getCloudCachePolicy() == CloudCachePolicy.LAZY) {
-            return new LazyCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver, false, guardian);
+            return new LazyCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver, false, guardian, executor);
         }

-        return new EagerCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver, guardian);
+        return new EagerCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver, guardian, executor);
     }

     public static IPartitionBootstrapper 
getCloudPartitionBootstrapper(IIOManager ioManager) {
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index 1cb6077..29e5da0 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -23,6 +23,7 @@
 import java.io.File;
 import java.util.Collections;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;

 import org.apache.asterix.cloud.clients.ICloudGuardian;
@@ -48,8 +49,9 @@
     private static final Logger LOGGER = LogManager.getLogger();

     public EagerCloudIOManager(IOManager ioManager, CloudProperties 
cloudProperties,
-            INamespacePathResolver nsPathResolver, ICloudGuardian guardian) 
throws HyracksDataException {
-        super(ioManager, cloudProperties, nsPathResolver, guardian);
+            INamespacePathResolver nsPathResolver, ICloudGuardian guardian, 
ExecutorService executor)
+            throws HyracksDataException {
+        super(ioManager, cloudProperties, nsPathResolver, guardian, executor);
     }

     /*
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 1c5efd9..7a7ea06 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -29,6 +29,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;

 import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
@@ -70,9 +71,9 @@
     private ILazyAccessor accessor;

     public LazyCloudIOManager(IOManager ioManager, CloudProperties 
cloudProperties,
-            INamespacePathResolver nsPathResolver, boolean selective, 
ICloudGuardian guardian)
+            INamespacePathResolver nsPathResolver, boolean selective, 
ICloudGuardian guardian, ExecutorService executor)
             throws HyracksDataException {
-        super(ioManager, cloudProperties, nsPathResolver, guardian);
+        super(ioManager, cloudProperties, nsPathResolver, guardian, executor);
         accessor = new InitialCloudAccessor(cloudClient, bucket, 
localIoManager);
         puncher = HolePuncherProvider.get(this, cloudProperties, 
writeBufferProvider);
         if (selective) {
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
index c98c6b4..a0c8bd0 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.cloud.clients;

+import java.util.concurrent.ExecutorService;
+
 import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
 import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
 import 
org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig;
@@ -38,8 +40,8 @@
         throw new AssertionError("do not instantiate");
     }

-    public static ICloudClient getClient(CloudProperties cloudProperties, 
ICloudGuardian guardian)
-            throws HyracksDataException {
+    public static ICloudClient getClient(CloudProperties cloudProperties, 
ICloudGuardian guardian,
+            ExecutorService executor) throws HyracksDataException {
         String storageScheme = cloudProperties.getStorageScheme();
         ICloudClient cloudClient;
         if (S3.equalsIgnoreCase(storageScheme)) {
@@ -47,7 +49,7 @@
             cloudClient = new S3CloudClient(config, guardian);
         } else if (GCS.equalsIgnoreCase(storageScheme)) {
             GCSClientConfig config = GCSClientConfig.of(cloudProperties);
-            cloudClient = new GCSCloudClient(config, guardian);
+            cloudClient = new GCSCloudClient(config, guardian, executor);
         } else if (AZ_BLOB.equalsIgnoreCase(storageScheme)) {
             AzBlobStorageClientConfig config = 
AzBlobStorageClientConfig.of(cloudProperties);
             cloudClient = new AzBlobStorageCloudClient(config, guardian);
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index fd82944..d332944 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -65,7 +65,7 @@
      * @param filter filter to apply
      * @return file names returned after applying the file name filter
      */
-    Set<CloudFile> listObjects(String bucket, String path, FilenameFilter 
filter);
+    Set<CloudFile> listObjects(String bucket, String path, FilenameFilter 
filter) throws HyracksDataException;

     /**
      * Performs a range-read from the specified bucket and path starting at 
the offset. The amount read is equal to the
@@ -107,7 +107,7 @@
      * @param path   path
      * @param data   data
      */
-    void write(String bucket, String path, byte[] data);
+    void write(String bucket, String path, byte[] data) throws 
HyracksDataException;

     /**
      * Copies an object from the source path to the destination path
@@ -116,7 +116,7 @@
      * @param srcPath  source path
      * @param destPath destination path
      */
-    void copy(String bucket, String srcPath, FileReference destPath);
+    void copy(String bucket, String srcPath, FileReference destPath) throws 
HyracksDataException;

     /**
      * Deletes all objects at the specified bucket and paths
@@ -162,7 +162,7 @@
      * @param bucket       bucket name
      * @return {@link JsonNode} with stored objects' information
      */
-    JsonNode listAsJson(ObjectMapper objectMapper, String bucket);
+    JsonNode listAsJson(ObjectMapper objectMapper, String bucket) throws 
HyracksDataException;

     /**
      * Performs any necessary closing and cleaning up
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index 924f34a..11728dc 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -31,6 +31,9 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;

 import org.apache.asterix.cloud.IWriteBufferProvider;
 import org.apache.asterix.cloud.clients.CloudFile;
@@ -76,12 +79,15 @@
     private final ICloudGuardian guardian;
     private final IRequestProfilerLimiter profilerLimiter;
     private final int writeBufferSize;
+    private final ExecutorService executor;

-    public GCSCloudClient(GCSClientConfig config, Storage gcsClient, 
ICloudGuardian guardian) {
+    public GCSCloudClient(GCSClientConfig config, Storage gcsClient, 
ICloudGuardian guardian,
+            ExecutorService executor) {
         this.gcsClient = gcsClient;
         this.config = config;
         this.guardian = guardian;
         this.writeBufferSize = config.getWriteBufferSize();
+        this.executor = executor;
         long profilerInterval = config.getProfilerLogInterval();
         GCSRequestRateLimiter limiter = new GCSRequestRateLimiter(config);
         if (profilerInterval > 0) {
@@ -92,8 +98,9 @@
         guardian.setCloudClient(this);
     }

-    public GCSCloudClient(GCSClientConfig config, ICloudGuardian guardian) 
throws HyracksDataException {
-        this(config, buildClient(config), guardian);
+    public GCSCloudClient(GCSClientConfig config, ICloudGuardian guardian, 
ExecutorService executor)
+            throws HyracksDataException {
+        this(config, buildClient(config), guardian, executor);
     }

     @Override
@@ -112,12 +119,12 @@
     }

     @Override
-    public Set<CloudFile> listObjects(String bucket, String path, 
FilenameFilter filter) {
+    public Set<CloudFile> listObjects(String bucket, String path, 
FilenameFilter filter) throws HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectsList();
-        Page<Blob> blobs = gcsClient.list(bucket, 
BlobListOption.prefix(config.getPrefix() + path),
-                BlobListOption.fields(Storage.BlobField.SIZE));
-
+        // MB-65432: Storage.list is not interrupt-safe; we need to offload 
onto another thread
+        Page<Blob> blobs = runOpInterruptibly(() -> gcsClient.list(bucket,
+                BlobListOption.prefix(config.getPrefix() + path), 
BlobListOption.fields(Storage.BlobField.SIZE)));
         Set<CloudFile> files = new HashSet<>();
         for (Blob blob : blobs.iterateAll()) {
             if (filter.accept(null, 
IoUtil.getFileNameFromPath(blob.getName()))) {
@@ -150,14 +157,18 @@
     }

     @Override
-    public byte[] readAllBytes(String bucket, String path) {
+    public byte[] readAllBytes(String bucket, String path) throws 
HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
         BlobId blobId = BlobId.of(bucket, config.getPrefix() + path);
         try {
-            return gcsClient.readAllBytes(blobId);
+            // MB-65432: Storage.readAllBytes is not interrupt-safe; we need 
to offload onto another thread
+            return runOpInterruptibly(() -> gcsClient.readAllBytes(blobId));
         } catch (StorageException e) {
-            return null;
+            if (e.getCode() == 404) {
+                return null;
+            }
+            throw HyracksDataException.create(e);
         }
     }

@@ -176,27 +187,32 @@
     }

     @Override
-    public void write(String bucket, String path, byte[] data) {
+    public void write(String bucket, String path, byte[] data) throws 
HyracksDataException {
         guardian.checkWriteAccess(bucket, path);
         profilerLimiter.objectWrite();
         BlobInfo blobInfo = BlobInfo.newBuilder(bucket, config.getPrefix() + 
path).build();
-        gcsClient.create(blobInfo, data);
+        // MB-65432: Storage.create is not interrupt-safe; we need to offload 
onto another thread
+        runOpInterruptibly(() -> gcsClient.create(blobInfo, data));
     }

     @Override
-    public void copy(String bucket, String srcPath, FileReference destPath) {
+    public void copy(String bucket, String srcPath, FileReference destPath) 
throws HyracksDataException {
         guardian.checkReadAccess(bucket, srcPath);
         profilerLimiter.objectsList();
-        Page<Blob> blobs = gcsClient.list(bucket, 
BlobListOption.prefix(config.getPrefix() + srcPath));
-        for (Blob blob : blobs.iterateAll()) {
-            profilerLimiter.objectCopy();
-            BlobId source = blob.getBlobId();
-            String targetName = 
destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName()));
-            BlobId target = BlobId.of(bucket, targetName);
-            guardian.checkWriteAccess(bucket, targetName);
-            CopyRequest copyReq = 
CopyRequest.newBuilder().setSource(source).setTarget(target).build();
-            gcsClient.copy(copyReq);
-        }
+        // MB-65432: Storage.list & copy are not interrupt-safe; we need to 
offload onto another thread
+        runOpInterruptibly(() -> {
+            Page<Blob> blobs = gcsClient.list(bucket, 
BlobListOption.prefix(config.getPrefix() + srcPath));
+            for (Blob blob : blobs.iterateAll()) {
+                profilerLimiter.objectCopy();
+                BlobId source = blob.getBlobId();
+                String targetName = 
destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName()));
+                BlobId target = BlobId.of(bucket, targetName);
+                guardian.checkWriteAccess(bucket, targetName);
+                CopyRequest copyReq = 
CopyRequest.newBuilder().setSource(source).setTarget(target).build();
+                gcsClient.copy(copyReq);
+            }
+            return null;
+        });
     }

     @Override
@@ -206,17 +222,16 @@
         }

         List<StorageBatchResult<Boolean>> deleteResponses = new ArrayList<>();
-        StorageBatch batchRequest;
         Iterator<String> pathIter = paths.iterator();
         while (pathIter.hasNext()) {
-            batchRequest = gcsClient.batch();
+            StorageBatch batchRequest = gcsClient.batch();
             for (int i = 0; pathIter.hasNext() && i < DELETE_BATCH_SIZE; i++) {
                 BlobId blobId = BlobId.of(bucket, config.getPrefix() + 
pathIter.next());
                 guardian.checkWriteAccess(bucket, blobId.getName());
                 deleteResponses.add(batchRequest.delete(blobId));
             }
-
-            batchRequest.submit();
+            // MB-65432: StorageBatch.submit may not be interrupt-safe; we 
need to offload onto another thread
+            runOpInterruptibly(batchRequest::submit);
             Iterator<String> deletePathIter = paths.iterator();
             for (StorageBatchResult<Boolean> deleteResponse : deleteResponses) 
{
                 String deletedPath = deletePathIter.next();
@@ -240,11 +255,12 @@
     }

     @Override
-    public long getObjectSize(String bucket, String path) {
+    public long getObjectSize(String bucket, String path) throws 
HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
-        Blob blob =
-                gcsClient.get(bucket, config.getPrefix() + path, 
Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
+        // MB-65432: Storage.get is not interrupt-safe; we need to offload 
onto another thread
+        Blob blob = runOpInterruptibly(() -> gcsClient.get(bucket, 
config.getPrefix() + path,
+                Storage.BlobGetOption.fields(Storage.BlobField.SIZE)));
         if (blob == null) {
             return 0;
         }
@@ -252,19 +268,22 @@
     }

     @Override
-    public boolean exists(String bucket, String path) {
+    public boolean exists(String bucket, String path) throws 
HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
-        Blob blob = gcsClient.get(bucket, config.getPrefix() + path,
-                Storage.BlobGetOption.fields(Storage.BlobField.values()));
+        // MB-65432: Storage.get is not interrupt-safe; we need to offload 
onto another thread
+        Blob blob = runOpInterruptibly(() -> gcsClient.get(bucket, 
config.getPrefix() + path,
+                Storage.BlobGetOption.fields(Storage.BlobField.values())));
         return blob != null && blob.exists();
     }

     @Override
-    public boolean isEmptyPrefix(String bucket, String path) {
+    public boolean isEmptyPrefix(String bucket, String path) throws 
HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectsList();
-        Page<Blob> blobs = gcsClient.list(bucket, 
BlobListOption.prefix(config.getPrefix() + path));
+        // MB-65432: Storage.list is not interrupt-safe; we need to offload 
onto another thread
+        Page<Blob> blobs =
+                runOpInterruptibly(() -> gcsClient.list(bucket, 
BlobListOption.prefix(config.getPrefix() + path)));
         return !blobs.iterateAll().iterator().hasNext();
     }

@@ -275,10 +294,11 @@
     }

     @Override
-    public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
+    public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) 
throws HyracksDataException {
         guardian.checkReadAccess(bucket, "/");
         profilerLimiter.objectsList();
-        Page<Blob> blobs = gcsClient.list(bucket, 
BlobListOption.fields(Storage.BlobField.SIZE));
+        Page<Blob> blobs =
+                runOpInterruptibly(() -> gcsClient.list(bucket, 
BlobListOption.fields(Storage.BlobField.SIZE)));
         ArrayNode objectsInfo = objectMapper.createArrayNode();

         List<Blob> objects = new ArrayList<>();
@@ -313,4 +333,24 @@
     private String stripCloudPrefix(String objectName) {
         return objectName.substring(config.getPrefix().length());
     }
+
+    private void runOpInterruptibly(Runnable operation) throws 
HyracksDataException {
+        try {
+            executor.submit(operation).get();
+        } catch (InterruptedException e) {
+            throw HyracksDataException.create(e);
+        } catch (ExecutionException e) {
+            throw HyracksDataException.create(e.getCause());
+        }
+    }
+
+    private <T> T runOpInterruptibly(Supplier<T> operation) throws 
HyracksDataException {
+        try {
+            return executor.submit(operation::get).get();
+        } catch (InterruptedException e) {
+            throw HyracksDataException.create(e);
+        } catch (ExecutionException e) {
+            throw HyracksDataException.create(e.getCause());
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
index 0d30120..cbbaf31 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
@@ -33,6 +33,7 @@
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.control.nc.io.IOManager;

 import com.google.api.gax.paging.Page;
@@ -50,7 +51,6 @@

 public class GCSParallelDownloader implements IParallelDownloader {

-    //    private static final Logger LOGGER = LogManager.getLogger();
     private final String bucket;
     private final IOManager ioManager;
     private final Storage gcsClient;
@@ -95,6 +95,7 @@
             downloadJobs.add(transferManager.downloadBlobs(entry.getValue(),
                     downConfig.setDownloadDirectory(entry.getKey()).build()));
         }
+        // MB-65432: DownloadJob.getDownloadResults is interrupt-safe; no need 
to offload
         downloadJobs.forEach(DownloadJob::getDownloadResults);
     }

@@ -121,6 +122,7 @@
         }
         List<DownloadResult> results;
         for (DownloadJob job : downloadJobs) {
+            // MB-65432: DownloadJob.getDownloadResults is interrupt-safe; no 
need to offload
             results = job.getDownloadResults();
             for (DownloadResult result : results) {
                 if (result.getStatus() != TransferStatus.SUCCESS) {
@@ -134,12 +136,7 @@

     @Override
     public void close() throws HyracksDataException {
-        try {
-            transferManager.close();
-            gcsClient.close();
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
+        InvokeUtil.tryWithCleanupsAsHyracks(transferManager::close, 
gcsClient::close);
     }

     private <K, V> void addToMap(Map<K, List<V>> map, K key, V value) {
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
index 89d8fd5..f240ad4 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
@@ -108,9 +108,10 @@
         profiler.objectMultipartUpload();
         try {
             writer.close();
-            writer = null;
         } catch (IOException | RuntimeException e) {
             throw HyracksDataException.create(e);
+        } finally {
+            writer = null;
         }
         log("FINISHED");
     }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
index 63a8366..0aa1f87 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
@@ -65,7 +65,8 @@
     ICloudClient createCloudClient(IApplicationContext appCtx) throws 
CompilationException {
         GCSClientConfig config = GCSClientConfig.of(configuration, 
writeBufferSize);
         return new GCSCloudClient(config, GCSUtils.buildClient(configuration),
-                ICloudGuardian.NoOpCloudGuardian.INSTANCE);
+                ICloudGuardian.NoOpCloudGuardian.INSTANCE,
+                
appCtx.getServiceContext().getControllerService().getExecutor());
     }

     @Override
diff --git 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
index d89c872..2f64c37 100644
--- 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
+++ 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.cloud.gcs;

+import java.util.concurrent.Executors;
+
 import org.apache.asterix.cloud.AbstractLSMTest;
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
@@ -52,7 +54,8 @@
         int writeBufferSize = StorageUtil.getIntSizeInBytes(5, 
StorageUtil.StorageUnit.MEGABYTE);
         GCSClientConfig config =
                 new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, 
true, 0, writeBufferSize, "");
-        CLOUD_CLIENT = new GCSCloudClient(config, 
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
+        CLOUD_CLIENT =
+                new GCSCloudClient(config, 
ICloudGuardian.NoOpCloudGuardian.INSTANCE, Executors.newCachedThreadPool());
     }

     private static void cleanup() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index fdda793..46c8ec2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -45,6 +45,8 @@
             return (HyracksDataException) cause;
         } else if (cause instanceof InterruptedException) {
             Thread.currentThread().interrupt();
+        } else if (cause instanceof Error) {
+            throw (Error) cause;
         }
         return new HyracksDataException(cause);
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index e7970d3..ff0a3c5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -216,6 +216,39 @@

     @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", 
"UnreachableCode" })
     // catching Throwable, instanceofs, false-positive unreachable code
+    public static void tryWithCleanupsAsHyracks(ThrowingAction action, 
ThrowingAction... cleanups)
+            throws HyracksDataException {
+        Throwable savedT = null;
+        boolean suppressedInterrupted = false;
+        try {
+            action.run();
+        } catch (Throwable t) {
+            savedT = t;
+        } finally {
+            for (ThrowingAction cleanup : cleanups) {
+                try {
+                    cleanup.run();
+                } catch (Throwable t) {
+                    if (savedT != null) {
+                        savedT.addSuppressed(t);
+                        suppressedInterrupted |= t instanceof 
InterruptedException;
+                    } else {
+                        savedT = t;
+                    }
+                }
+            }
+        }
+        if (savedT == null) {
+            return;
+        }
+        if (suppressedInterrupted) {
+            Thread.currentThread().interrupt();
+        }
+        throw HyracksDataException.create(savedT);
+    }
+
+    @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", 
"UnreachableCode" })
+    // catching Throwable, instanceofs, false-positive unreachable code
     public static void tryWithCleanups(ThrowingAction action, 
ThrowingAction... cleanups) throws Exception {
         Throwable savedT = null;
         boolean suppressedInterrupted = false;

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19464
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: ionic
Gerrit-Change-Id: Iab4b86fe3966a2d0b509c9bd871f792c3e35ff23
Gerrit-Change-Number: 19464
Gerrit-PatchSet: 1
Gerrit-Owner: Michael Blow <[email protected]>
Gerrit-MessageType: newchange

Reply via email to