>From Michael Blow <[email protected]>:

Michael Blow has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19464 )

Change subject: [NO ISSUE][*DB][CLOUD] Ensure interrupts are not lost during 
GCS ops
......................................................................

[NO ISSUE][*DB][CLOUD] Ensure interrupts are not lost during GCS ops

Ext-ref: MB-65432
Change-Id: Iab4b86fe3966a2d0b509c9bd871f792c3e35ff23
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19464
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
Tested-by: Michael Blow <[email protected]>
---
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
M 
asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
21 files changed, 195 insertions(+), 88 deletions(-)

Approvals:
  Ali Alsuliman: Looks good to me, approved
  Michael Blow: Verified
  Anon. E. Moose #1000171:
  Jenkins: Verified




diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 343baf0..de4a066 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -225,13 +225,15 @@
             IConfigValidatorFactory configValidatorFactory, 
IReplicationStrategyFactory replicationStrategyFactory,
             boolean initialRun) throws IOException {
         ioManager = getServiceContext().getIoManager();
+        threadExecutor =
+                
MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
         CloudConfigurator cloudConfigurator;
         IDiskResourceCacheLockNotifier lockNotifier;
         IDiskCachedPageAllocator pageAllocator;
         IBufferCacheReadContext defaultContext;
         if (isCloudDeployment()) {
             cloudConfigurator = CloudConfigurator.of(cloudProperties, 
ioManager, namespacePathResolver,
-                    getCloudGuardian(cloudProperties));
+                    getCloudGuardian(cloudProperties), threadExecutor);
             persistenceIOManager = cloudConfigurator.getCloudIoManager();
             partitionBootstrapper = 
cloudConfigurator.getPartitionBootstrapper();
             lockNotifier = cloudConfigurator.getLockNotifier();
@@ -247,8 +249,6 @@
         }

         int ioQueueLen = 
getServiceContext().getAppConfig().getInt(NCConfig.Option.IO_QUEUE_SIZE);
-        threadExecutor =
-                
MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
         ICacheMemoryAllocator bufferAllocator = new HeapBufferAllocator();
         IPageCleanerPolicy pcp = new DelayPageCleanerPolicy(600000);
         IPageReplacementStrategy prs = new 
ClockPageReplacementStrategy(bufferAllocator, pageAllocator,
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 78f574b..179b996 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -131,7 +131,7 @@
      * not supported, yet.
      */
     @Override
-    public SystemState getSystemState() throws ACIDException {
+    public SystemState getSystemState() throws ACIDException, 
HyracksDataException {
         //read checkpoint file
         Checkpoint checkpointObject = checkpointManager.getLatest();
         if (checkpointObject == null) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 6928b64..5eafe05 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -191,7 +191,7 @@
         if (cloudDeployment) {
             cloudProperties = new 
CloudProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
             ioManager = CloudConfigurator.createIOManager(ioManager, 
cloudProperties, namespacePathResolver,
-                    getCloudGuardian(cloudProperties));
+                    getCloudGuardian(cloudProperties), 
controllerService.getExecutor());
         }
         IGlobalTxManager globalTxManager = createGlobalTxManager(ioManager);
         appCtx = createApplicationContext(null, globalRecoveryManager, 
lifecycleCoordinator, Receptionist::new,
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 6f8126b..0da4e0d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -370,7 +370,7 @@
         return nodeStatus == NodeStatus.IDLE && (primaryCc == null || 
primaryCc.equals(registeredCc));
     }

-    private SystemState getCurrentSystemState() {
+    private SystemState getCurrentSystemState() throws HyracksDataException {
         final NodeProperties nodeProperties = 
runtimeContext.getNodeProperties();
         IRecoveryManager recoveryMgr = 
runtimeContext.getTransactionSubsystem().getRecoveryManager();
         SystemState state = recoveryMgr.getSystemState();
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 912ca47..516ede2 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -31,6 +31,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.function.Predicate;

 import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
@@ -84,12 +85,13 @@
     private final List<FileStore> drivePaths;

     public AbstractCloudIOManager(IOManager ioManager, CloudProperties 
cloudProperties,
-            INamespacePathResolver nsPathResolver, ICloudGuardian guardian) 
throws HyracksDataException {
+            INamespacePathResolver nsPathResolver, ICloudGuardian guardian, 
ExecutorService executor)
+            throws HyracksDataException {
         super(ioManager.getIODevices(), ioManager.getDeviceComputer(), 
ioManager.getIOParallelism(),
                 ioManager.getQueueSize());
         this.nsPathResolver = nsPathResolver;
         this.bucket = cloudProperties.getStorageBucket();
-        cloudClient = CloudClientProvider.getClient(cloudProperties, guardian);
+        cloudClient = CloudClientProvider.getClient(cloudProperties, guardian, 
executor);
         this.guardian = guardian;
         int numOfThreads = getIODevices().size() * getIOParallelism();
         writeBufferProvider = new WriteBufferProvider(numOfThreads, 
cloudClient.getWriteBufferSize());
@@ -106,7 +108,7 @@
      */

     @Override
-    public SystemState getSystemStateOnMissingCheckpoint() {
+    public SystemState getSystemStateOnMissingCheckpoint() throws 
HyracksDataException {
         Set<CloudFile> existingMetadataFiles = 
getCloudMetadataPartitionFiles();
         CloudFile bootstrapMarkerPath = 
CloudFile.of(StoragePathUtil.getBootstrapMarkerRelativePath(nsPathResolver));
         if (existingMetadataFiles.isEmpty() || 
existingMetadataFiles.contains(bootstrapMarkerPath)) {
@@ -436,7 +438,7 @@
      * @param key   the key where the bytes will be written
      * @param bytes the bytes to write
      */
-    public final void put(String key, byte[] bytes) {
+    public final void put(String key, byte[] bytes) throws 
HyracksDataException {
         cloudClient.write(bucket, key, bytes);
     }

@@ -444,7 +446,7 @@
         return cloudClient;
     }

-    private Set<CloudFile> getCloudMetadataPartitionFiles() {
+    private Set<CloudFile> getCloudMetadataPartitionFiles() throws 
HyracksDataException {
         String metadataNamespacePath = 
StoragePathUtil.getNamespacePath(nsPathResolver,
                 MetadataConstants.METADATA_NAMESPACE, METADATA_PARTITION);
         return cloudClient.listObjects(bucket, metadataNamespacePath, 
IoUtil.NO_OP_FILTER);
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
index dced5b0..e494333 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
@@ -70,11 +70,12 @@
     private final long diskCacheMonitoringInterval;

     private CloudConfigurator(CloudProperties cloudProperties, IIOManager 
ioManager,
-            INamespacePathResolver nsPathResolver, ICloudGuardian guardian) 
throws HyracksDataException {
+            INamespacePathResolver nsPathResolver, ICloudGuardian guardian, 
ExecutorService executor)
+            throws HyracksDataException {
         this.cloudProperties = cloudProperties;
         localIoManager = (IOManager) ioManager;
         diskCacheManagerRequired = cloudProperties.getCloudCachePolicy() == 
CloudCachePolicy.SELECTIVE;
-        cloudIOManager = createIOManager(ioManager, cloudProperties, 
nsPathResolver, guardian);
+        cloudIOManager = createIOManager(ioManager, cloudProperties, 
nsPathResolver, guardian, executor);
         physicalDrive = createPhysicalDrive(diskCacheManagerRequired, 
cloudProperties, ioManager);
         lockNotifier = createLockNotifier(diskCacheManagerRequired);
         pageAllocator = createPageAllocator(diskCacheManagerRequired);
@@ -131,20 +132,22 @@
     }

     public static CloudConfigurator of(CloudProperties cloudProperties, 
IIOManager ioManager,
-            INamespacePathResolver nsPathResolver, ICloudGuardian 
cloudGuardian) throws HyracksDataException {
-        return new CloudConfigurator(cloudProperties, ioManager, 
nsPathResolver, cloudGuardian);
+            INamespacePathResolver nsPathResolver, ICloudGuardian 
cloudGuardian, ExecutorService executor)
+            throws HyracksDataException {
+        return new CloudConfigurator(cloudProperties, ioManager, 
nsPathResolver, cloudGuardian, executor);
     }

     public static AbstractCloudIOManager createIOManager(IIOManager ioManager, 
CloudProperties cloudProperties,
-            INamespacePathResolver nsPathResolver, ICloudGuardian guardian) 
throws HyracksDataException {
+            INamespacePathResolver nsPathResolver, ICloudGuardian guardian, 
ExecutorService executor)
+            throws HyracksDataException {
         IOManager localIoManager = (IOManager) ioManager;
         CloudCachePolicy policy = cloudProperties.getCloudCachePolicy();
         if (policy == CloudCachePolicy.EAGER) {
-            return new EagerCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver, guardian);
+            return new EagerCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver, guardian, executor);
         }

         boolean selective = policy == CloudCachePolicy.SELECTIVE;
-        return new LazyCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver, selective, guardian);
+        return new LazyCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver, selective, guardian, executor);
     }
 
     private static IPhysicalDrive createPhysicalDrive(boolean 
diskCacheManagerRequired, CloudProperties cloudProperties,
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
index b49f3ce..b944161 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.cloud;

+import java.util.concurrent.ExecutorService;
+
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.common.api.INamespacePathResolver;
 import org.apache.asterix.common.cloud.CloudCachePolicy;
@@ -32,13 +34,14 @@
     }

     public static IIOManager createIOManager(CloudProperties cloudProperties, 
IIOManager ioManager,
-            INamespacePathResolver nsPathResolver, ICloudGuardian guardian) 
throws HyracksDataException {
+            INamespacePathResolver nsPathResolver, ICloudGuardian guardian, 
ExecutorService executor)
+            throws HyracksDataException {
         IOManager localIoManager = (IOManager) ioManager;
         if (cloudProperties.getCloudCachePolicy() == CloudCachePolicy.LAZY) {
-            return new LazyCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver, false, guardian);
+            return new LazyCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver, false, guardian, executor);
         }

-        return new EagerCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver, guardian);
+        return new EagerCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver, guardian, executor);
     }

     public static IPartitionBootstrapper 
getCloudPartitionBootstrapper(IIOManager ioManager) {
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index 1cb6077..29e5da0 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -23,6 +23,7 @@
 import java.io.File;
 import java.util.Collections;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;

 import org.apache.asterix.cloud.clients.ICloudGuardian;
@@ -48,8 +49,9 @@
     private static final Logger LOGGER = LogManager.getLogger();

     public EagerCloudIOManager(IOManager ioManager, CloudProperties 
cloudProperties,
-            INamespacePathResolver nsPathResolver, ICloudGuardian guardian) 
throws HyracksDataException {
-        super(ioManager, cloudProperties, nsPathResolver, guardian);
+            INamespacePathResolver nsPathResolver, ICloudGuardian guardian, 
ExecutorService executor)
+            throws HyracksDataException {
+        super(ioManager, cloudProperties, nsPathResolver, guardian, executor);
     }

     /*
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 1c5efd9..7a7ea06 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -29,6 +29,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;

 import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
@@ -70,9 +71,9 @@
     private ILazyAccessor accessor;

     public LazyCloudIOManager(IOManager ioManager, CloudProperties 
cloudProperties,
-            INamespacePathResolver nsPathResolver, boolean selective, 
ICloudGuardian guardian)
+            INamespacePathResolver nsPathResolver, boolean selective, 
ICloudGuardian guardian, ExecutorService executor)
             throws HyracksDataException {
-        super(ioManager, cloudProperties, nsPathResolver, guardian);
+        super(ioManager, cloudProperties, nsPathResolver, guardian, executor);
         accessor = new InitialCloudAccessor(cloudClient, bucket, 
localIoManager);
         puncher = HolePuncherProvider.get(this, cloudProperties, 
writeBufferProvider);
         if (selective) {
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
index c98c6b4..a0c8bd0 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.cloud.clients;

+import java.util.concurrent.ExecutorService;
+
 import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
 import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
 import 
org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig;
@@ -38,8 +40,8 @@
         throw new AssertionError("do not instantiate");
     }
 
-    public static ICloudClient getClient(CloudProperties cloudProperties, 
ICloudGuardian guardian)
-            throws HyracksDataException {
+    public static ICloudClient getClient(CloudProperties cloudProperties, 
ICloudGuardian guardian,
+            ExecutorService executor) throws HyracksDataException {
         String storageScheme = cloudProperties.getStorageScheme();
         ICloudClient cloudClient;
         if (S3.equalsIgnoreCase(storageScheme)) {
@@ -47,7 +49,7 @@
             cloudClient = new S3CloudClient(config, guardian);
         } else if (GCS.equalsIgnoreCase(storageScheme)) {
             GCSClientConfig config = GCSClientConfig.of(cloudProperties);
-            cloudClient = new GCSCloudClient(config, guardian);
+            cloudClient = new GCSCloudClient(config, guardian, executor);
         } else if (AZ_BLOB.equalsIgnoreCase(storageScheme)) {
             AzBlobStorageClientConfig config = 
AzBlobStorageClientConfig.of(cloudProperties);
             cloudClient = new AzBlobStorageCloudClient(config, guardian);
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index fd82944..d332944 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -65,7 +65,7 @@
      * @param filter filter to apply
      * @return file names returned after applying the file name filter
      */
-    Set<CloudFile> listObjects(String bucket, String path, FilenameFilter 
filter);
+    Set<CloudFile> listObjects(String bucket, String path, FilenameFilter 
filter) throws HyracksDataException;

     /**
      * Performs a range-read from the specified bucket and path starting at 
the offset. The amount read is equal to the
@@ -107,7 +107,7 @@
      * @param path   path
      * @param data   data
      */
-    void write(String bucket, String path, byte[] data);
+    void write(String bucket, String path, byte[] data) throws 
HyracksDataException;

     /**
      * Copies an object from the source path to the destination path
@@ -116,7 +116,7 @@
      * @param srcPath  source path
      * @param destPath destination path
      */
-    void copy(String bucket, String srcPath, FileReference destPath);
+    void copy(String bucket, String srcPath, FileReference destPath) throws 
HyracksDataException;

     /**
      * Deletes all objects at the specified bucket and paths
@@ -162,7 +162,7 @@
      * @param bucket       bucket name
      * @return {@link JsonNode} with stored objects' information
      */
-    JsonNode listAsJson(ObjectMapper objectMapper, String bucket);
+    JsonNode listAsJson(ObjectMapper objectMapper, String bucket) throws 
HyracksDataException;

     /**
      * Performs any necessary closing and cleaning up
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
index 28fa53e..2f61b31 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
@@ -66,7 +66,7 @@
     }

     @Override
-    public Set<CloudFile> listObjects(String bucket, String path, 
FilenameFilter filter) {
+    public Set<CloudFile> listObjects(String bucket, String path, 
FilenameFilter filter) throws HyracksDataException {
         return cloudClient.listObjects(bucket, path, filter);
     }

@@ -88,12 +88,12 @@
     }

     @Override
-    public void write(String bucket, String path, byte[] data) {
+    public void write(String bucket, String path, byte[] data) throws 
HyracksDataException {
         cloudClient.write(bucket, path, data);
     }

     @Override
-    public void copy(String bucket, String srcPath, FileReference destPath) {
+    public void copy(String bucket, String srcPath, FileReference destPath) 
throws HyracksDataException {
         cloudClient.copy(bucket, srcPath, destPath);
     }

@@ -127,7 +127,7 @@
     }

     @Override
-    public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
+    public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) 
throws HyracksDataException {
         return cloudClient.listAsJson(objectMapper, bucket);
     }

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index 924f34a..99cda9e 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -31,6 +31,9 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;

 import org.apache.asterix.cloud.IWriteBufferProvider;
 import org.apache.asterix.cloud.clients.CloudFile;
@@ -76,12 +79,15 @@
     private final ICloudGuardian guardian;
     private final IRequestProfilerLimiter profilerLimiter;
     private final int writeBufferSize;
+    private final ExecutorService executor;

-    public GCSCloudClient(GCSClientConfig config, Storage gcsClient, 
ICloudGuardian guardian) {
+    public GCSCloudClient(GCSClientConfig config, Storage gcsClient, 
ICloudGuardian guardian,
+            ExecutorService executor) {
         this.gcsClient = gcsClient;
         this.config = config;
         this.guardian = guardian;
         this.writeBufferSize = config.getWriteBufferSize();
+        this.executor = executor;
         long profilerInterval = config.getProfilerLogInterval();
         GCSRequestRateLimiter limiter = new GCSRequestRateLimiter(config);
         if (profilerInterval > 0) {
@@ -92,8 +98,9 @@
         guardian.setCloudClient(this);
     }

-    public GCSCloudClient(GCSClientConfig config, ICloudGuardian guardian) 
throws HyracksDataException {
-        this(config, buildClient(config), guardian);
+    public GCSCloudClient(GCSClientConfig config, ICloudGuardian guardian, 
ExecutorService executor)
+            throws HyracksDataException {
+        this(config, buildClient(config), guardian, executor);
     }

     @Override
@@ -112,12 +119,12 @@
     }

     @Override
-    public Set<CloudFile> listObjects(String bucket, String path, 
FilenameFilter filter) {
+    public Set<CloudFile> listObjects(String bucket, String path, 
FilenameFilter filter) throws HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectsList();
-        Page<Blob> blobs = gcsClient.list(bucket, 
BlobListOption.prefix(config.getPrefix() + path),
-                BlobListOption.fields(Storage.BlobField.SIZE));
-
+        // MB-65432: Storage.list is not interrupt-safe; we need to offload 
onto another thread
+        Page<Blob> blobs = runOpInterruptibly(() -> gcsClient.list(bucket,
+                BlobListOption.prefix(config.getPrefix() + path), 
BlobListOption.fields(Storage.BlobField.SIZE)));
         Set<CloudFile> files = new HashSet<>();
         for (Blob blob : blobs.iterateAll()) {
             if (filter.accept(null, 
IoUtil.getFileNameFromPath(blob.getName()))) {
@@ -150,15 +157,21 @@
     }

     @Override
-    public byte[] readAllBytes(String bucket, String path) {
+    public byte[] readAllBytes(String bucket, String path) throws 
HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
         BlobId blobId = BlobId.of(bucket, config.getPrefix() + path);
-        try {
-            return gcsClient.readAllBytes(blobId);
-        } catch (StorageException e) {
-            return null;
-        }
+        // MB-65432: Storage.readAllBytes is not interrupt-safe; we need to 
offload onto another thread
+        return runOpInterruptibly(() -> {
+            try {
+                return gcsClient.readAllBytes(blobId);
+            } catch (StorageException e) {
+                if (e.getCode() == 404) {
+                    return null;
+                }
+                throw e;
+            }
+        });
     }

     @Override
@@ -176,27 +189,32 @@
     }

     @Override
-    public void write(String bucket, String path, byte[] data) {
+    public void write(String bucket, String path, byte[] data) throws 
HyracksDataException {
         guardian.checkWriteAccess(bucket, path);
         profilerLimiter.objectWrite();
         BlobInfo blobInfo = BlobInfo.newBuilder(bucket, config.getPrefix() + 
path).build();
-        gcsClient.create(blobInfo, data);
+        // MB-65432: Storage.create is not interrupt-safe; we need to offload 
onto another thread
+        runOpInterruptibly(() -> gcsClient.create(blobInfo, data));
     }

     @Override
-    public void copy(String bucket, String srcPath, FileReference destPath) {
+    public void copy(String bucket, String srcPath, FileReference destPath) 
throws HyracksDataException {
         guardian.checkReadAccess(bucket, srcPath);
         profilerLimiter.objectsList();
-        Page<Blob> blobs = gcsClient.list(bucket, 
BlobListOption.prefix(config.getPrefix() + srcPath));
-        for (Blob blob : blobs.iterateAll()) {
-            profilerLimiter.objectCopy();
-            BlobId source = blob.getBlobId();
-            String targetName = 
destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName()));
-            BlobId target = BlobId.of(bucket, targetName);
-            guardian.checkWriteAccess(bucket, targetName);
-            CopyRequest copyReq = 
CopyRequest.newBuilder().setSource(source).setTarget(target).build();
-            gcsClient.copy(copyReq);
-        }
+        // MB-65432: Storage.list & copy are not interrupt-safe; we need to 
offload onto another thread
+        runOpInterruptibly(() -> {
+            Page<Blob> blobs = gcsClient.list(bucket, 
BlobListOption.prefix(config.getPrefix() + srcPath));
+            for (Blob blob : blobs.iterateAll()) {
+                profilerLimiter.objectCopy();
+                BlobId source = blob.getBlobId();
+                String targetName = 
destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName()));
+                BlobId target = BlobId.of(bucket, targetName);
+                guardian.checkWriteAccess(bucket, targetName);
+                CopyRequest copyReq = 
CopyRequest.newBuilder().setSource(source).setTarget(target).build();
+                gcsClient.copy(copyReq);
+            }
+            return null;
+        });
     }
 
     @Override
@@ -206,17 +224,16 @@
         }

         List<StorageBatchResult<Boolean>> deleteResponses = new ArrayList<>();
-        StorageBatch batchRequest;
         Iterator<String> pathIter = paths.iterator();
         while (pathIter.hasNext()) {
-            batchRequest = gcsClient.batch();
+            StorageBatch batchRequest = gcsClient.batch();
             for (int i = 0; pathIter.hasNext() && i < DELETE_BATCH_SIZE; i++) {
                 BlobId blobId = BlobId.of(bucket, config.getPrefix() + 
pathIter.next());
                 guardian.checkWriteAccess(bucket, blobId.getName());
                 deleteResponses.add(batchRequest.delete(blobId));
             }
-
-            batchRequest.submit();
+            // MB-65432: StorageBatch.submit may not be interrupt-safe; we 
need to offload onto another thread
+            runOpInterruptibly(batchRequest::submit);
             Iterator<String> deletePathIter = paths.iterator();
             for (StorageBatchResult<Boolean> deleteResponse : deleteResponses) 
{
                 String deletedPath = deletePathIter.next();
@@ -240,11 +257,12 @@
     }

     @Override
-    public long getObjectSize(String bucket, String path) {
+    public long getObjectSize(String bucket, String path) throws 
HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
-        Blob blob =
-                gcsClient.get(bucket, config.getPrefix() + path, 
Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
+        // MB-65432: Storage.get is not interrupt-safe; we need to offload 
onto another thread
+        Blob blob = runOpInterruptibly(() -> gcsClient.get(bucket, 
config.getPrefix() + path,
+                Storage.BlobGetOption.fields(Storage.BlobField.SIZE)));
         if (blob == null) {
             return 0;
         }
@@ -252,19 +270,22 @@
     }

     @Override
-    public boolean exists(String bucket, String path) {
+    public boolean exists(String bucket, String path) throws 
HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
-        Blob blob = gcsClient.get(bucket, config.getPrefix() + path,
-                Storage.BlobGetOption.fields(Storage.BlobField.values()));
+        // MB-65432: Storage.get is not interrupt-safe; we need to offload 
onto another thread
+        Blob blob = runOpInterruptibly(() -> gcsClient.get(bucket, 
config.getPrefix() + path,
+                Storage.BlobGetOption.fields(Storage.BlobField.values())));
         return blob != null && blob.exists();
     }

     @Override
-    public boolean isEmptyPrefix(String bucket, String path) {
+    public boolean isEmptyPrefix(String bucket, String path) throws 
HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectsList();
-        Page<Blob> blobs = gcsClient.list(bucket, 
BlobListOption.prefix(config.getPrefix() + path));
+        // MB-65432: Storage.list is not interrupt-safe; we need to offload 
onto another thread
+        Page<Blob> blobs =
+                runOpInterruptibly(() -> gcsClient.list(bucket, 
BlobListOption.prefix(config.getPrefix() + path)));
         return !blobs.iterateAll().iterator().hasNext();
     }

@@ -275,10 +296,12 @@
     }

     @Override
-    public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
+    public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) 
throws HyracksDataException {
         guardian.checkReadAccess(bucket, "/");
         profilerLimiter.objectsList();
-        Page<Blob> blobs = gcsClient.list(bucket, 
BlobListOption.fields(Storage.BlobField.SIZE));
+        // MB-65432: Storage.list is not interrupt-safe; we need to offload 
onto another thread
+        Page<Blob> blobs =
+                runOpInterruptibly(() -> gcsClient.list(bucket, 
BlobListOption.fields(Storage.BlobField.SIZE)));
         ArrayNode objectsInfo = objectMapper.createArrayNode();

         List<Blob> objects = new ArrayList<>();
@@ -313,4 +336,24 @@
     private String stripCloudPrefix(String objectName) {
         return objectName.substring(config.getPrefix().length());
     }
+
+    private void runOpInterruptibly(Runnable operation) throws 
HyracksDataException {
+        try {
+            executor.submit(operation).get();
+        } catch (InterruptedException e) {
+            throw HyracksDataException.create(e);
+        } catch (ExecutionException e) {
+            throw HyracksDataException.create(e.getCause());
+        }
+    }
+
+    private <T> T runOpInterruptibly(Supplier<T> operation) throws 
HyracksDataException {
+        try {
+            return executor.submit(operation::get).get();
+        } catch (InterruptedException e) {
+            throw HyracksDataException.create(e);
+        } catch (ExecutionException e) {
+            throw HyracksDataException.create(e.getCause());
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
index 0d30120..cbbaf31 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
@@ -33,6 +33,7 @@
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.control.nc.io.IOManager;

 import com.google.api.gax.paging.Page;
@@ -50,7 +51,6 @@

 public class GCSParallelDownloader implements IParallelDownloader {

-    //    private static final Logger LOGGER = LogManager.getLogger();
     private final String bucket;
     private final IOManager ioManager;
     private final Storage gcsClient;
@@ -95,6 +95,7 @@
             downloadJobs.add(transferManager.downloadBlobs(entry.getValue(),
                     downConfig.setDownloadDirectory(entry.getKey()).build()));
         }
+        // MB-65432: DownloadJob.getDownloadResults is interrupt-safe; no need 
to offload
         downloadJobs.forEach(DownloadJob::getDownloadResults);
     }

@@ -121,6 +122,7 @@
         }
         List<DownloadResult> results;
         for (DownloadJob job : downloadJobs) {
+            // MB-65432: DownloadJob.getDownloadResults is interrupt-safe; no 
need to offload
             results = job.getDownloadResults();
             for (DownloadResult result : results) {
                 if (result.getStatus() != TransferStatus.SUCCESS) {
@@ -134,12 +136,7 @@

     @Override
     public void close() throws HyracksDataException {
-        try {
-            transferManager.close();
-            gcsClient.close();
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
+        InvokeUtil.tryWithCleanupsAsHyracks(transferManager::close, 
gcsClient::close);
     }

     private <K, V> void addToMap(Map<K, List<V>> map, K key, V value) {
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
index 89d8fd5..f240ad4 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
@@ -108,9 +108,10 @@
         profiler.objectMultipartUpload();
         try {
             writer.close();
-            writer = null;
         } catch (IOException | RuntimeException e) {
             throw HyracksDataException.create(e);
+        } finally {
+            writer = null;
         }
         log("FINISHED");
     }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
index 63a8366..0aa1f87 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
@@ -65,7 +65,8 @@
     ICloudClient createCloudClient(IApplicationContext appCtx) throws 
CompilationException {
         GCSClientConfig config = GCSClientConfig.of(configuration, 
writeBufferSize);
         return new GCSCloudClient(config, GCSUtils.buildClient(configuration),
-                ICloudGuardian.NoOpCloudGuardian.INSTANCE);
+                ICloudGuardian.NoOpCloudGuardian.INSTANCE,
+                
appCtx.getServiceContext().getControllerService().getExecutor());
     }

     @Override
diff --git 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
index d89c872..2f64c37 100644
--- 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
+++ 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.cloud.gcs;

+import java.util.concurrent.Executors;
+
 import org.apache.asterix.cloud.AbstractLSMTest;
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
@@ -52,7 +54,8 @@
         int writeBufferSize = StorageUtil.getIntSizeInBytes(5, 
StorageUtil.StorageUnit.MEGABYTE);
         GCSClientConfig config =
                 new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, 
true, 0, writeBufferSize, "");
-        CLOUD_CLIENT = new GCSCloudClient(config, 
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
+        CLOUD_CLIENT =
+                new GCSCloudClient(config, 
ICloudGuardian.NoOpCloudGuardian.INSTANCE, Executors.newCachedThreadPool());
     }

     private static void cleanup() {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
index 73a1392..88e88ab 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
@@ -34,7 +34,7 @@
      * @return the systems state in case the checkpoint upon calling {@link 
IRecoveryManager#getSystemState()}
      * is missing the checkpoint file
      */
-    IRecoveryManager.SystemState getSystemStateOnMissingCheckpoint();
+    IRecoveryManager.SystemState getSystemStateOnMissingCheckpoint() throws 
HyracksDataException;

     /**
      * Bootstraps the node's partitions directory by doing the following:
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index aef2215..a5f79ac 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -61,7 +61,7 @@
      * @return SystemState The state of the system
      * @throws ACIDException
      */
-    SystemState getSystemState() throws ACIDException;
+    SystemState getSystemState() throws ACIDException, HyracksDataException;

     /**
      * Rolls back a transaction.
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index fdda793..46c8ec2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -45,6 +45,8 @@
             return (HyracksDataException) cause;
         } else if (cause instanceof InterruptedException) {
             Thread.currentThread().interrupt();
+        } else if (cause instanceof Error) {
+            throw (Error) cause;
         }
         return new HyracksDataException(cause);
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index e7970d3..ff0a3c5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -216,6 +216,39 @@

     @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", 
"UnreachableCode" })
     // catching Throwable, instanceofs, false-positive unreachable code
+    public static void tryWithCleanupsAsHyracks(ThrowingAction action, 
ThrowingAction... cleanups)
+            throws HyracksDataException {
+        Throwable savedT = null;
+        boolean suppressedInterrupted = false;
+        try {
+            action.run();
+        } catch (Throwable t) {
+            savedT = t;
+        } finally {
+            for (ThrowingAction cleanup : cleanups) {
+                try {
+                    cleanup.run();
+                } catch (Throwable t) {
+                    if (savedT != null) {
+                        savedT.addSuppressed(t);
+                        suppressedInterrupted |= t instanceof 
InterruptedException;
+                    } else {
+                        savedT = t;
+                    }
+                }
+            }
+        }
+        if (savedT == null) {
+            return;
+        }
+        if (suppressedInterrupted) {
+            Thread.currentThread().interrupt();
+        }
+        throw HyracksDataException.create(savedT);
+    }
+
+    @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", 
"UnreachableCode" })
+    // catching Throwable, instanceofs, false-positive unreachable code
     public static void tryWithCleanups(ThrowingAction action, 
ThrowingAction... cleanups) throws Exception {
         Throwable savedT = null;
         boolean suppressedInterrupted = false;

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19464
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: ionic
Gerrit-Change-Id: Iab4b86fe3966a2d0b509c9bd871f792c3e35ff23
Gerrit-Change-Number: 19464
Gerrit-PatchSet: 5
Gerrit-Owner: Michael Blow <[email protected]>
Gerrit-Reviewer: Ali Alsuliman <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Hussain Towaileb <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Michael Blow <[email protected]>
Gerrit-MessageType: merged

Reply via email to