>From Michael Blow <[email protected]>: Michael Blow has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19464 )
Change subject: [NO ISSUE][*DB][CLOUD] Ensure interrupts are not lost during GCS ops ...................................................................... [NO ISSUE][*DB][CLOUD] Ensure interrupts are not lost during GCS ops Ext-ref: MB-65432 Change-Id: Iab4b86fe3966a2d0b509c9bd871f792c3e35ff23 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19464 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Tested-by: Michael Blow <[email protected]> --- M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java M asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java 21 files changed, 195 insertions(+), 88 deletions(-) Approvals: Ali Alsuliman: Looks good to me, approved Michael Blow: Verified Anon. E. Moose #1000171: Jenkins: Verified diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index 343baf0..de4a066 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -225,13 +225,15 @@ IConfigValidatorFactory configValidatorFactory, IReplicationStrategyFactory replicationStrategyFactory, boolean initialRun) throws IOException { ioManager = getServiceContext().getIoManager(); + threadExecutor = + MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory()); CloudConfigurator cloudConfigurator; IDiskResourceCacheLockNotifier lockNotifier; IDiskCachedPageAllocator pageAllocator; IBufferCacheReadContext defaultContext; if (isCloudDeployment()) { cloudConfigurator = CloudConfigurator.of(cloudProperties, ioManager, namespacePathResolver, - getCloudGuardian(cloudProperties)); + getCloudGuardian(cloudProperties), threadExecutor); persistenceIOManager = cloudConfigurator.getCloudIoManager(); partitionBootstrapper = cloudConfigurator.getPartitionBootstrapper(); lockNotifier = cloudConfigurator.getLockNotifier(); @@ -247,8 +249,6 @@ } int ioQueueLen = getServiceContext().getAppConfig().getInt(NCConfig.Option.IO_QUEUE_SIZE); - threadExecutor = - MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory()); ICacheMemoryAllocator bufferAllocator = new HeapBufferAllocator(); IPageCleanerPolicy pcp = new DelayPageCleanerPolicy(600000); IPageReplacementStrategy prs = new ClockPageReplacementStrategy(bufferAllocator, pageAllocator, diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 78f574b..179b996 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -131,7 +131,7 @@ * not supported, yet. */ @Override - public SystemState getSystemState() throws ACIDException { + public SystemState getSystemState() throws ACIDException, HyracksDataException { //read checkpoint file Checkpoint checkpointObject = checkpointManager.getLatest(); if (checkpointObject == null) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index 6928b64..5eafe05 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -191,7 +191,7 @@ if (cloudDeployment) { cloudProperties = new CloudProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig())); ioManager = CloudConfigurator.createIOManager(ioManager, cloudProperties, namespacePathResolver, - getCloudGuardian(cloudProperties)); + getCloudGuardian(cloudProperties), controllerService.getExecutor()); } IGlobalTxManager globalTxManager = createGlobalTxManager(ioManager); appCtx = createApplicationContext(null, globalRecoveryManager, lifecycleCoordinator, Receptionist::new, diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index 6f8126b..0da4e0d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -370,7 +370,7 @@ return nodeStatus == NodeStatus.IDLE && (primaryCc == null || primaryCc.equals(registeredCc)); } - private SystemState getCurrentSystemState() { + private SystemState getCurrentSystemState() throws HyracksDataException { final NodeProperties nodeProperties = runtimeContext.getNodeProperties(); IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager(); SystemState state = recoveryMgr.getSystemState(); diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java index 912ca47..516ede2 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java @@ -31,6 +31,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.function.Predicate; import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation; @@ -84,12 +85,13 @@ private final List<FileStore> drivePaths; public AbstractCloudIOManager(IOManager ioManager, CloudProperties cloudProperties, - INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException { + INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor) + throws HyracksDataException { super(ioManager.getIODevices(), ioManager.getDeviceComputer(), ioManager.getIOParallelism(), ioManager.getQueueSize()); this.nsPathResolver = nsPathResolver; this.bucket = cloudProperties.getStorageBucket(); - cloudClient = CloudClientProvider.getClient(cloudProperties, guardian); + cloudClient = CloudClientProvider.getClient(cloudProperties, guardian, executor); this.guardian = guardian; int numOfThreads = getIODevices().size() * getIOParallelism(); writeBufferProvider = new WriteBufferProvider(numOfThreads, cloudClient.getWriteBufferSize()); @@ -106,7 +108,7 @@ */ @Override - public SystemState getSystemStateOnMissingCheckpoint() { + public SystemState getSystemStateOnMissingCheckpoint() throws HyracksDataException { Set<CloudFile> existingMetadataFiles = getCloudMetadataPartitionFiles(); CloudFile bootstrapMarkerPath = CloudFile.of(StoragePathUtil.getBootstrapMarkerRelativePath(nsPathResolver)); if (existingMetadataFiles.isEmpty() || existingMetadataFiles.contains(bootstrapMarkerPath)) { @@ -436,7 +438,7 @@ * @param key the key where the bytes will be written * @param bytes the bytes to write */ - public final void put(String key, byte[] bytes) { + public final void put(String key, byte[] bytes) throws HyracksDataException { cloudClient.write(bucket, key, bytes); } @@ -444,7 +446,7 @@ return cloudClient; } - private Set<CloudFile> getCloudMetadataPartitionFiles() { + private Set<CloudFile> getCloudMetadataPartitionFiles() throws HyracksDataException { String metadataNamespacePath = StoragePathUtil.getNamespacePath(nsPathResolver, MetadataConstants.METADATA_NAMESPACE, METADATA_PARTITION); return cloudClient.listObjects(bucket, metadataNamespacePath, IoUtil.NO_OP_FILTER); diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java index dced5b0..e494333 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java @@ -70,11 +70,12 @@ private final long diskCacheMonitoringInterval; private CloudConfigurator(CloudProperties cloudProperties, IIOManager ioManager, - INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException { + INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor) + throws HyracksDataException { this.cloudProperties = cloudProperties; localIoManager = (IOManager) ioManager; diskCacheManagerRequired = cloudProperties.getCloudCachePolicy() == CloudCachePolicy.SELECTIVE; - cloudIOManager = createIOManager(ioManager, cloudProperties, nsPathResolver, guardian); + cloudIOManager = createIOManager(ioManager, cloudProperties, nsPathResolver, guardian, executor); physicalDrive = createPhysicalDrive(diskCacheManagerRequired, cloudProperties, ioManager); lockNotifier = createLockNotifier(diskCacheManagerRequired); pageAllocator = createPageAllocator(diskCacheManagerRequired); @@ -131,20 +132,22 @@ } public static CloudConfigurator of(CloudProperties cloudProperties, IIOManager ioManager, - INamespacePathResolver nsPathResolver, ICloudGuardian cloudGuardian) throws HyracksDataException { - return new CloudConfigurator(cloudProperties, ioManager, nsPathResolver, cloudGuardian); + INamespacePathResolver nsPathResolver, ICloudGuardian cloudGuardian, ExecutorService executor) + throws HyracksDataException { + return new CloudConfigurator(cloudProperties, ioManager, nsPathResolver, cloudGuardian, executor); } public static AbstractCloudIOManager createIOManager(IIOManager ioManager, CloudProperties cloudProperties, - INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException { + INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor) + throws HyracksDataException { IOManager localIoManager = (IOManager) ioManager; CloudCachePolicy policy = cloudProperties.getCloudCachePolicy(); if (policy == CloudCachePolicy.EAGER) { - return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian); + return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian, executor); } boolean selective = policy == CloudCachePolicy.SELECTIVE; - return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, selective, guardian); + return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, selective, guardian, executor); } private static IPhysicalDrive createPhysicalDrive(boolean diskCacheManagerRequired, CloudProperties cloudProperties, diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java index b49f3ce..b944161 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.cloud; +import java.util.concurrent.ExecutorService; + import org.apache.asterix.cloud.clients.ICloudGuardian; import org.apache.asterix.common.api.INamespacePathResolver; import org.apache.asterix.common.cloud.CloudCachePolicy; @@ -32,13 +34,14 @@ } public static IIOManager createIOManager(CloudProperties cloudProperties, IIOManager ioManager, - INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException { + INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor) + throws HyracksDataException { IOManager localIoManager = (IOManager) ioManager; if (cloudProperties.getCloudCachePolicy() == CloudCachePolicy.LAZY) { - return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, false, guardian); + return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, false, guardian, executor); } - return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian); + return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian, executor); } public static IPartitionBootstrapper getCloudPartitionBootstrapper(IIOManager ioManager) { diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java index 1cb6077..29e5da0 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java @@ -23,6 +23,7 @@ import java.io.File; import java.util.Collections; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import org.apache.asterix.cloud.clients.ICloudGuardian; @@ -48,8 +49,9 @@ private static final Logger LOGGER = LogManager.getLogger(); public EagerCloudIOManager(IOManager ioManager, CloudProperties cloudProperties, - INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException { - super(ioManager, cloudProperties, nsPathResolver, guardian); + INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor) + throws HyracksDataException { + super(ioManager, cloudProperties, nsPathResolver, guardian, executor); } /* diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java index 1c5efd9..7a7ea06 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java @@ -29,6 +29,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation; @@ -70,9 +71,9 @@ private ILazyAccessor accessor; public LazyCloudIOManager(IOManager ioManager, CloudProperties cloudProperties, - INamespacePathResolver nsPathResolver, boolean selective, ICloudGuardian guardian) + INamespacePathResolver nsPathResolver, boolean selective, ICloudGuardian guardian, ExecutorService executor) throws HyracksDataException { - super(ioManager, cloudProperties, nsPathResolver, guardian); + super(ioManager, cloudProperties, nsPathResolver, guardian, executor); accessor = new InitialCloudAccessor(cloudClient, bucket, localIoManager); puncher = HolePuncherProvider.get(this, cloudProperties, writeBufferProvider); if (selective) { diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java index c98c6b4..a0c8bd0 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.cloud.clients; +import java.util.concurrent.ExecutorService; + import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig; import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient; import org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig; @@ -38,8 +40,8 @@ throw new AssertionError("do not instantiate"); } - public static ICloudClient getClient(CloudProperties cloudProperties, ICloudGuardian guardian) - throws HyracksDataException { + public static ICloudClient getClient(CloudProperties cloudProperties, ICloudGuardian guardian, + ExecutorService executor) throws HyracksDataException { String storageScheme = cloudProperties.getStorageScheme(); ICloudClient cloudClient; if (S3.equalsIgnoreCase(storageScheme)) { @@ -47,7 +49,7 @@ cloudClient = new S3CloudClient(config, guardian); } else if (GCS.equalsIgnoreCase(storageScheme)) { GCSClientConfig config = GCSClientConfig.of(cloudProperties); - cloudClient = new GCSCloudClient(config, guardian); + cloudClient = new GCSCloudClient(config, guardian, executor); } else if (AZ_BLOB.equalsIgnoreCase(storageScheme)) { AzBlobStorageClientConfig config = AzBlobStorageClientConfig.of(cloudProperties); cloudClient = new AzBlobStorageCloudClient(config, guardian); diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java index fd82944..d332944 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java @@ -65,7 +65,7 @@ * @param filter filter to apply * @return file names returned after applying the file name filter */ - Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter); + Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) throws HyracksDataException; /** * Performs a range-read from the specified bucket and path starting at the offset. The amount read is equal to the @@ -107,7 +107,7 @@ * @param path path * @param data data */ - void write(String bucket, String path, byte[] data); + void write(String bucket, String path, byte[] data) throws HyracksDataException; /** * Copies an object from the source path to the destination path @@ -116,7 +116,7 @@ * @param srcPath source path * @param destPath destination path */ - void copy(String bucket, String srcPath, FileReference destPath); + void copy(String bucket, String srcPath, FileReference destPath) throws HyracksDataException; /** * Deletes all objects at the specified bucket and paths @@ -162,7 +162,7 @@ * @param bucket bucket name * @return {@link JsonNode} with stored objects' information */ - JsonNode listAsJson(ObjectMapper objectMapper, String bucket); + JsonNode listAsJson(ObjectMapper objectMapper, String bucket) throws HyracksDataException; /** * Performs any necessary closing and cleaning up diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java index 28fa53e..2f61b31 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java @@ -66,7 +66,7 @@ } @Override - public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) { + public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) throws HyracksDataException { return cloudClient.listObjects(bucket, path, filter); } @@ -88,12 +88,12 @@ } @Override - public void write(String bucket, String path, byte[] data) { + public void write(String bucket, String path, byte[] data) throws HyracksDataException { cloudClient.write(bucket, path, data); } @Override - public void copy(String bucket, String srcPath, FileReference destPath) { + public void copy(String bucket, String srcPath, FileReference destPath) throws HyracksDataException { cloudClient.copy(bucket, srcPath, destPath); } @@ -127,7 +127,7 @@ } @Override - public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) { + public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) throws HyracksDataException { return cloudClient.listAsJson(objectMapper, bucket); } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java index 924f34a..99cda9e 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java @@ -31,6 +31,9 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; import org.apache.asterix.cloud.IWriteBufferProvider; import org.apache.asterix.cloud.clients.CloudFile; @@ -76,12 +79,15 @@ private final ICloudGuardian guardian; private final IRequestProfilerLimiter profilerLimiter; private final int writeBufferSize; + private final ExecutorService executor; - public GCSCloudClient(GCSClientConfig config, Storage gcsClient, ICloudGuardian guardian) { + public GCSCloudClient(GCSClientConfig config, Storage gcsClient, ICloudGuardian guardian, + ExecutorService executor) { this.gcsClient = gcsClient; this.config = config; this.guardian = guardian; this.writeBufferSize = config.getWriteBufferSize(); + this.executor = executor; long profilerInterval = config.getProfilerLogInterval(); GCSRequestRateLimiter limiter = new GCSRequestRateLimiter(config); if (profilerInterval > 0) { @@ -92,8 +98,9 @@ guardian.setCloudClient(this); } - public GCSCloudClient(GCSClientConfig config, ICloudGuardian guardian) throws HyracksDataException { - this(config, buildClient(config), guardian); + public GCSCloudClient(GCSClientConfig config, ICloudGuardian guardian, ExecutorService executor) + throws HyracksDataException { + this(config, buildClient(config), guardian, executor); } @Override @@ -112,12 +119,12 @@ } @Override - public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) { + public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) throws HyracksDataException { guardian.checkReadAccess(bucket, path); profilerLimiter.objectsList(); - Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path), - BlobListOption.fields(Storage.BlobField.SIZE)); - + // MB-65432: Storage.list is not interrupt-safe; we need to offload onto another thread + Page<Blob> blobs = runOpInterruptibly(() -> gcsClient.list(bucket, + BlobListOption.prefix(config.getPrefix() + path), BlobListOption.fields(Storage.BlobField.SIZE))); Set<CloudFile> files = new HashSet<>(); for (Blob blob : blobs.iterateAll()) { if (filter.accept(null, IoUtil.getFileNameFromPath(blob.getName()))) { @@ -150,15 +157,21 @@ } @Override - public byte[] readAllBytes(String bucket, String path) { + public byte[] readAllBytes(String bucket, String path) throws HyracksDataException { guardian.checkReadAccess(bucket, path); profilerLimiter.objectGet(); BlobId blobId = BlobId.of(bucket, config.getPrefix() + path); - try { - return gcsClient.readAllBytes(blobId); - } catch (StorageException e) { - return null; - } + // MB-65432: Storage.readAllBytes is not interrupt-safe; we need to offload onto another thread + return runOpInterruptibly(() -> { + try { + return gcsClient.readAllBytes(blobId); + } catch (StorageException e) { + if (e.getCode() == 404) { + return null; + } + throw e; + } + }); } @Override @@ -176,27 +189,32 @@ } @Override - public void write(String bucket, String path, byte[] data) { + public void write(String bucket, String path, byte[] data) throws HyracksDataException { guardian.checkWriteAccess(bucket, path); profilerLimiter.objectWrite(); BlobInfo blobInfo = BlobInfo.newBuilder(bucket, config.getPrefix() + path).build(); - gcsClient.create(blobInfo, data); + // MB-65432: Storage.create is not interrupt-safe; we need to offload onto another thread + runOpInterruptibly(() -> gcsClient.create(blobInfo, data)); } @Override - public void copy(String bucket, String srcPath, FileReference destPath) { + public void copy(String bucket, String srcPath, FileReference destPath) throws HyracksDataException { guardian.checkReadAccess(bucket, srcPath); profilerLimiter.objectsList(); - Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + srcPath)); - for (Blob blob : blobs.iterateAll()) { - profilerLimiter.objectCopy(); - BlobId source = blob.getBlobId(); - String targetName = destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName())); - BlobId target = BlobId.of(bucket, targetName); - guardian.checkWriteAccess(bucket, targetName); - CopyRequest copyReq = CopyRequest.newBuilder().setSource(source).setTarget(target).build(); - gcsClient.copy(copyReq); - } + // MB-65432: Storage.list & copy are not interrupt-safe; we need to offload onto another thread + runOpInterruptibly(() -> { + Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + srcPath)); + for (Blob blob : blobs.iterateAll()) { + profilerLimiter.objectCopy(); + BlobId source = blob.getBlobId(); + String targetName = destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName())); + BlobId target = BlobId.of(bucket, targetName); + guardian.checkWriteAccess(bucket, targetName); + CopyRequest copyReq = CopyRequest.newBuilder().setSource(source).setTarget(target).build(); + gcsClient.copy(copyReq); + } + return null; + }); } @Override @@ -206,17 +224,16 @@ } List<StorageBatchResult<Boolean>> deleteResponses = new ArrayList<>(); - StorageBatch batchRequest; Iterator<String> pathIter = paths.iterator(); while (pathIter.hasNext()) { - batchRequest = gcsClient.batch(); + StorageBatch batchRequest = gcsClient.batch(); for (int i = 0; pathIter.hasNext() && i < DELETE_BATCH_SIZE; i++) { BlobId blobId = BlobId.of(bucket, config.getPrefix() + pathIter.next()); guardian.checkWriteAccess(bucket, blobId.getName()); deleteResponses.add(batchRequest.delete(blobId)); } - - batchRequest.submit(); + // MB-65432: StorageBatch.submit may not be interrupt-safe; we need to offload onto another thread + runOpInterruptibly(batchRequest::submit); Iterator<String> deletePathIter = paths.iterator(); for (StorageBatchResult<Boolean> deleteResponse : deleteResponses) { String deletedPath = deletePathIter.next(); @@ -240,11 +257,12 @@ } @Override - public long getObjectSize(String bucket, String path) { + public long getObjectSize(String bucket, String path) throws HyracksDataException { guardian.checkReadAccess(bucket, path); profilerLimiter.objectGet(); - Blob blob = - gcsClient.get(bucket, config.getPrefix() + path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE)); + // MB-65432: Storage.get is not interrupt-safe; we need to offload onto another thread + Blob blob = runOpInterruptibly(() -> gcsClient.get(bucket, config.getPrefix() + path, + Storage.BlobGetOption.fields(Storage.BlobField.SIZE))); if (blob == null) { return 0; } @@ -252,19 +270,22 @@ } @Override - public boolean exists(String bucket, String path) { + public boolean exists(String bucket, String path) throws HyracksDataException { guardian.checkReadAccess(bucket, path); profilerLimiter.objectGet(); - Blob blob = gcsClient.get(bucket, config.getPrefix() + path, - Storage.BlobGetOption.fields(Storage.BlobField.values())); + // MB-65432: Storage.get is not interrupt-safe; we need to offload onto another thread + Blob blob = runOpInterruptibly(() -> gcsClient.get(bucket, config.getPrefix() + path, + Storage.BlobGetOption.fields(Storage.BlobField.values()))); return blob != null && blob.exists(); } @Override - public boolean isEmptyPrefix(String bucket, String path) { + public boolean isEmptyPrefix(String bucket, String path) throws HyracksDataException { guardian.checkReadAccess(bucket, path); profilerLimiter.objectsList(); - Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path)); + // MB-65432: Storage.list is not interrupt-safe; we need to offload onto another thread + Page<Blob> blobs = + runOpInterruptibly(() -> gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path))); return !blobs.iterateAll().iterator().hasNext(); } @@ -275,10 +296,12 @@ } @Override - public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) { + public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) throws HyracksDataException { guardian.checkReadAccess(bucket, "/"); profilerLimiter.objectsList(); - Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.fields(Storage.BlobField.SIZE)); + // MB-65432: Storage.list is not interrupt-safe; we need to offload onto another thread + Page<Blob> blobs = + runOpInterruptibly(() -> gcsClient.list(bucket, BlobListOption.fields(Storage.BlobField.SIZE))); ArrayNode objectsInfo = objectMapper.createArrayNode(); List<Blob> objects = new ArrayList<>(); @@ -313,4 +336,24 @@ private String stripCloudPrefix(String objectName) { return objectName.substring(config.getPrefix().length()); } + + private void runOpInterruptibly(Runnable operation) throws HyracksDataException { + try { + executor.submit(operation).get(); + } catch (InterruptedException e) { + throw HyracksDataException.create(e); + } catch (ExecutionException e) { + throw HyracksDataException.create(e.getCause()); + } + } + + private <T> T runOpInterruptibly(Supplier<T> operation) throws HyracksDataException { + try { + return executor.submit(operation::get).get(); + } catch (InterruptedException e) { + throw HyracksDataException.create(e); + } catch (ExecutionException e) { + throw HyracksDataException.create(e.getCause()); + } + } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java index 0d30120..cbbaf31 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java @@ -33,6 +33,7 @@ import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.util.InvokeUtil; import org.apache.hyracks.control.nc.io.IOManager; import com.google.api.gax.paging.Page; @@ -50,7 +51,6 @@ public class GCSParallelDownloader implements IParallelDownloader { - // private static final Logger LOGGER = LogManager.getLogger(); private final String bucket; private final IOManager ioManager; private final Storage gcsClient; @@ -95,6 +95,7 @@ downloadJobs.add(transferManager.downloadBlobs(entry.getValue(), downConfig.setDownloadDirectory(entry.getKey()).build())); } + // MB-65432: DownloadJob.getDownloadResults is interrupt-safe; no need to offload downloadJobs.forEach(DownloadJob::getDownloadResults); } @@ -121,6 +122,7 @@ } List<DownloadResult> results; for (DownloadJob job : downloadJobs) { + // MB-65432: DownloadJob.getDownloadResults is interrupt-safe; no need to offload results = job.getDownloadResults(); for (DownloadResult result : results) { if (result.getStatus() != TransferStatus.SUCCESS) { @@ -134,12 +136,7 @@ @Override public void close() throws HyracksDataException { - try { - transferManager.close(); - gcsClient.close(); - } catch (Exception e) { - throw HyracksDataException.create(e); - } + InvokeUtil.tryWithCleanupsAsHyracks(transferManager::close, gcsClient::close); } private <K, V> void addToMap(Map<K, List<V>> map, K key, V value) { diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java index 89d8fd5..f240ad4 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java @@ -108,9 +108,10 @@ profiler.objectMultipartUpload(); try { writer.close(); - writer = null; } catch (IOException | RuntimeException e) { throw HyracksDataException.create(e); + } finally { + writer = null; } log("FINISHED"); } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java index 63a8366..0aa1f87 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java @@ -65,7 +65,8 @@ ICloudClient createCloudClient(IApplicationContext appCtx) throws CompilationException { GCSClientConfig config = GCSClientConfig.of(configuration, writeBufferSize); return new GCSCloudClient(config, GCSUtils.buildClient(configuration), - ICloudGuardian.NoOpCloudGuardian.INSTANCE); + ICloudGuardian.NoOpCloudGuardian.INSTANCE, + appCtx.getServiceContext().getControllerService().getExecutor()); } @Override diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java index d89c872..2f64c37 100644 --- a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java +++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.cloud.gcs; +import java.util.concurrent.Executors; + import org.apache.asterix.cloud.AbstractLSMTest; import org.apache.asterix.cloud.clients.ICloudGuardian; import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig; @@ -52,7 +54,8 @@ int writeBufferSize = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE); GCSClientConfig config = new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, true, 0, writeBufferSize, ""); - CLOUD_CLIENT = new GCSCloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE); + CLOUD_CLIENT = + new GCSCloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE, Executors.newCachedThreadPool()); } private static void cleanup() { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java index 73a1392..88e88ab 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java @@ -34,7 +34,7 @@ * @return the systems state in case the checkpoint upon calling {@link IRecoveryManager#getSystemState()} * is missing the checkpoint file */ - IRecoveryManager.SystemState getSystemStateOnMissingCheckpoint(); + IRecoveryManager.SystemState getSystemStateOnMissingCheckpoint() throws HyracksDataException; /** * Bootstraps the node's partitions directory by doing the following: diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java index aef2215..a5f79ac 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java @@ -61,7 +61,7 @@ * @return SystemState The state of the system * @throws ACIDException */ - SystemState getSystemState() throws ACIDException; + SystemState getSystemState() throws ACIDException, HyracksDataException; /** * Rolls back a transaction. diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java index fdda793..46c8ec2 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java @@ -45,6 +45,8 @@ return (HyracksDataException) cause; } else if (cause instanceof InterruptedException) { Thread.currentThread().interrupt(); + } else if (cause instanceof Error) { + throw (Error) cause; } return new HyracksDataException(cause); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java index e7970d3..ff0a3c5 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java @@ -216,6 +216,39 @@ @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", "UnreachableCode" }) // catching Throwable, instanceofs, false-positive unreachable code + public static void tryWithCleanupsAsHyracks(ThrowingAction action, ThrowingAction... cleanups) + throws HyracksDataException { + Throwable savedT = null; + boolean suppressedInterrupted = false; + try { + action.run(); + } catch (Throwable t) { + savedT = t; + } finally { + for (ThrowingAction cleanup : cleanups) { + try { + cleanup.run(); + } catch (Throwable t) { + if (savedT != null) { + savedT.addSuppressed(t); + suppressedInterrupted |= t instanceof InterruptedException; + } else { + savedT = t; + } + } + } + } + if (savedT == null) { + return; + } + if (suppressedInterrupted) { + Thread.currentThread().interrupt(); + } + throw HyracksDataException.create(savedT); + } + + @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", "UnreachableCode" }) + // catching Throwable, instanceofs, false-positive unreachable code public static void tryWithCleanups(ThrowingAction action, ThrowingAction... cleanups) throws Exception { Throwable savedT = null; boolean suppressedInterrupted = false; -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19464 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: ionic Gerrit-Change-Id: Iab4b86fe3966a2d0b509c9bd871f792c3e35ff23 Gerrit-Change-Number: 19464 Gerrit-PatchSet: 5 Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-MessageType: merged
