>From Wail Alkowaileet <[email protected]>:

Wail Alkowaileet has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18291 )


Change subject: [ASTERIXDB-3387][STO] Enable Selective caching policy
......................................................................

[ASTERIXDB-3387][STO] Enable Selective caching policy

- user model changes: yes
- storage format changes: no
- interface changes: no

Details:
Allow user to enable/select Selective caching policy

Change-Id: Ia212db8c1673f6b725eb21fe7035a1e20490d1f6
---
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
A 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
A 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/NoOpDiskSpaceMaker.java
9 files changed, 383 insertions(+), 16 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/91/18291/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index cd52b9d..1d6a0d8 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -33,7 +33,7 @@

 import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.cloud.CloudManagerProvider;
+import org.apache.asterix.cloud.CloudConfigurator;
 import org.apache.asterix.cloud.LocalPartitionBootstrapper;
 import org.apache.asterix.common.api.IConfigValidator;
 import org.apache.asterix.common.api.IConfigValidatorFactory;
@@ -122,6 +122,10 @@
 import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
 import 
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
 import 
org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
+import 
org.apache.hyracks.storage.common.disk.NoOpDiskResourceCacheLockNotifier;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 import org.apache.hyracks.storage.common.file.FileMapManager;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
@@ -180,6 +184,7 @@
     private IPartitionBootstrapper partitionBootstrapper;
     private final INamespacePathResolver namespacePathResolver;
     private final INamespaceResolver namespaceResolver;
+    private IDiskCacheMonitoringService diskCacheService;

     public NCAppRuntimeContext(INCServiceContext ncServiceContext, 
NCExtensionManager extensionManager,
             IPropertiesFactory propertiesFactory, INamespaceResolver 
namespaceResolver,
@@ -211,16 +216,26 @@
             IConfigValidatorFactory configValidatorFactory, 
IReplicationStrategyFactory replicationStrategyFactory,
             boolean initialRun) throws IOException {
         ioManager = getServiceContext().getIoManager();
-        IDiskCachedPageAllocator pageAllocator = 
DefaultDiskCachedPageAllocator.INSTANCE;
-        IBufferCacheReadContext defaultContext = 
DefaultBufferCacheReadContextProvider.DEFAULT;
+        CloudConfigurator cloudConfigurator;
+        IDiskResourceCacheLockNotifier lockNotifier;
+        IDiskCachedPageAllocator pageAllocator;
+        IBufferCacheReadContext defaultContext;
         if (isCloudDeployment()) {
-            persistenceIOManager =
-                    CloudManagerProvider.createIOManager(cloudProperties, 
ioManager, namespacePathResolver);
-            partitionBootstrapper = 
CloudManagerProvider.getCloudPartitionBootstrapper(persistenceIOManager);
+            cloudConfigurator = CloudConfigurator.of(cloudProperties, 
ioManager, namespacePathResolver);
+            persistenceIOManager = cloudConfigurator.getCloudIoManager();
+            partitionBootstrapper = 
cloudConfigurator.getPartitionBootstrapper();
+            lockNotifier = cloudConfigurator.getLockNotifier();
+            pageAllocator = cloudConfigurator.getPageAllocator();
+            defaultContext = cloudConfigurator.getDefaultContext();
         } else {
+            cloudConfigurator = null;
             persistenceIOManager = ioManager;
             partitionBootstrapper = new LocalPartitionBootstrapper(ioManager);
+            lockNotifier = NoOpDiskResourceCacheLockNotifier.INSTANCE;
+            pageAllocator = DefaultDiskCachedPageAllocator.INSTANCE;
+            defaultContext = DefaultBufferCacheReadContextProvider.DEFAULT;
         }
+
         int ioQueueLen = 
getServiceContext().getAppConfig().getInt(NCConfig.Option.IO_QUEUE_SIZE);
         threadExecutor =
                 
MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
@@ -261,9 +276,8 @@
         // Must start vbc now instead of by life cycle component manager 
(lccm) because lccm happens after
         // the metadata bootstrap task
         ((ILifeCycleComponent) virtualBufferCache).start();
-        datasetLifecycleManager =
-                new DatasetLifecycleManager(storageProperties, 
localResourceRepository, txnSubsystem.getLogManager(),
-                        virtualBufferCache, indexCheckpointManagerProvider, 
ioManager.getIODevices().size());
+        datasetLifecycleManager = new 
DatasetLifecycleManager(storageProperties, localResourceRepository,
+                txnSubsystem.getLogManager(), virtualBufferCache, 
indexCheckpointManagerProvider, lockNotifier);
         
localResourceRepository.setDatasetLifecycleManager(datasetLifecycleManager);
         final String nodeId = getServiceContext().getNodeId();
         final Set<Integer> nodePartitions = 
metadataProperties.getNodePartitions(nodeId);
@@ -300,6 +314,15 @@
                     fileInfoMap, defaultContext);
         }

+        if (cloudConfigurator != null) {
+            diskCacheService =
+                    
cloudConfigurator.createDiskCacheMonitoringService(getServiceContext(), 
bufferCache, fileInfoMap);
+        } else {
+            diskCacheService = NoOpDiskCacheMonitoringService.INSTANCE;
+        }
+
+        diskCacheService.start();
+
         NodeControllerService ncs = (NodeControllerService) 
getServiceContext().getControllerService();
         FileReference appDir =
                 
ioManager.resolveAbsolutePath(getServiceContext().getServerCtx().getAppDir().getAbsolutePath());
@@ -351,6 +374,7 @@
     @Override
     public synchronized void preStop() throws Exception {
         activeManager.shutdown();
+        diskCacheService.stop();
         if (metadataNodeStub != null) {
             unexportMetadataNodeStub();
         }
@@ -695,6 +719,11 @@
     }

     @Override
+    public IDiskCacheMonitoringService getDiskCacheService() {
+        return diskCacheService;
+    }
+
+    @Override
     public boolean isCloudDeployment() {
         return ncServiceContext.getAppConfig().getBoolean(CLOUD_DEPLOYMENT);
     }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
new file mode 100644
index 0000000..3ee59d2
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.asterix.common.api.INamespacePathResolver;
+import org.apache.asterix.common.cloud.CloudCachePolicy;
+import org.apache.asterix.common.cloud.IPartitionBootstrapper;
+import org.apache.asterix.common.config.CloudProperties;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadContext;
+import org.apache.hyracks.cloud.buffercache.page.CloudDiskCachedPageAllocator;
+import 
org.apache.hyracks.cloud.cache.service.CloudDiskCacheMonitoringAndPrefetchingService;
+import 
org.apache.hyracks.cloud.cache.service.CloudDiskResourceCacheLockNotifier;
+import org.apache.hyracks.cloud.cache.service.DiskCacheSweeperThread;
+import org.apache.hyracks.cloud.filesystem.PhysicalDrive;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import 
org.apache.hyracks.storage.common.buffercache.DefaultDiskCachedPageAllocator;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IDiskCachedPageAllocator;
+import 
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import 
org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider;
+import org.apache.hyracks.storage.common.disk.DummyPhysicalDrive;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
+import 
org.apache.hyracks.storage.common.disk.NoOpDiskResourceCacheLockNotifier;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
+public final class CloudConfigurator {
+    private final IOManager localIoManager;
+    private final AbstractCloudIOManager cloudIOManager;
+    private final IPhysicalDrive physicalDrive;
+    private final IDiskResourceCacheLockNotifier lockNotifier;
+    private final IDiskCachedPageAllocator pageAllocator;
+    private final IBufferCacheReadContext defaultContext;
+    private final boolean diskCacheManagerRequired;
+    private final long diskCacheMonitoringInterval;
+
+    private CloudConfigurator(CloudProperties cloudProperties, IIOManager 
ioManager,
+            INamespacePathResolver nsPathResolver) throws HyracksDataException 
{
+        localIoManager = (IOManager) ioManager;
+        diskCacheManagerRequired = cloudProperties.getCloudCachePolicy() == 
CloudCachePolicy.SELECTIVE;
+        cloudIOManager = createIOManager(ioManager, cloudProperties, 
nsPathResolver);
+        physicalDrive = createPhysicalDrive(diskCacheManagerRequired, 
cloudProperties, ioManager);
+        lockNotifier = createLockNotifier(diskCacheManagerRequired);
+        pageAllocator = createPageAllocator(diskCacheManagerRequired);
+        defaultContext = 
createDefaultBufferCachePageOpContext(diskCacheManagerRequired, physicalDrive);
+        diskCacheMonitoringInterval = 
cloudProperties.getStorageDiskMonitorInterval();
+    }
+
+    public IPartitionBootstrapper getPartitionBootstrapper() {
+        return cloudIOManager;
+    }
+
+    public IIOManager getCloudIoManager() {
+        return cloudIOManager;
+    }
+
+    public IDiskResourceCacheLockNotifier getLockNotifier() {
+        return lockNotifier;
+    }
+
+    public IDiskCachedPageAllocator getPageAllocator() {
+        return pageAllocator;
+    }
+
+    public IBufferCacheReadContext getDefaultContext() {
+        return defaultContext;
+    }
+
+    public IDiskCacheMonitoringService 
createDiskCacheMonitoringService(INCServiceContext serviceContext,
+            IBufferCache bufferCache, Map<Integer, BufferedFileHandle> 
fileInfoMap) {
+        if (!diskCacheManagerRequired) {
+            return NoOpDiskCacheMonitoringService.INSTANCE;
+        }
+
+        CloudDiskResourceCacheLockNotifier resourceCacheManager = 
(CloudDiskResourceCacheLockNotifier) lockNotifier;
+        BufferCache diskBufferCache = (BufferCache) bufferCache;
+        int numOfIoDevices = localIoManager.getIODevices().size();
+        IApplicationConfig appConfig = serviceContext.getAppConfig();
+        int ioParallelism = 
appConfig.getInt(NCConfig.Option.IO_WORKERS_PER_PARTITION);
+        int sweepQueueSize = appConfig.getInt(NCConfig.Option.IO_QUEUE_SIZE);
+        int numOfSweepThreads = ioParallelism * numOfIoDevices;
+        // Ensure at least each sweep thread has one entry in the queue
+        int maxSweepQueueSize = Math.max(numOfSweepThreads, sweepQueueSize);
+        // +1 for the monitorThread
+        ExecutorService executor = 
Executors.newFixedThreadPool(numOfSweepThreads + 1);
+        DiskCacheSweeperThread monitorThread =
+                new DiskCacheSweeperThread(executor, 
diskCacheMonitoringInterval, resourceCacheManager, cloudIOManager,
+                        numOfSweepThreads, maxSweepQueueSize, physicalDrive, 
diskBufferCache, fileInfoMap);
+
+        IDiskCacheMonitoringService diskCacheService =
+                new CloudDiskCacheMonitoringAndPrefetchingService(executor, 
physicalDrive, monitorThread);
+        localIoManager.setSpaceMaker(monitorThread);
+        return diskCacheService;
+    }
+
+    public static CloudConfigurator of(CloudProperties cloudProperties, 
IIOManager ioManager,
+            INamespacePathResolver nsPathResolver) throws HyracksDataException 
{
+        return new CloudConfigurator(cloudProperties, ioManager, 
nsPathResolver);
+    }
+
+    public static AbstractCloudIOManager createIOManager(IIOManager ioManager, 
CloudProperties cloudProperties,
+            INamespacePathResolver nsPathResolver) throws HyracksDataException 
{
+        IOManager localIoManager = (IOManager) ioManager;
+        CloudCachePolicy policy = cloudProperties.getCloudCachePolicy();
+        if (policy == CloudCachePolicy.EAGER) {
+            return new EagerCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver);
+        }
+
+        boolean selective = policy == CloudCachePolicy.SELECTIVE;
+        return new LazyCloudIOManager(localIoManager, cloudProperties, 
nsPathResolver, selective);
+    }
+
+    private static IPhysicalDrive createPhysicalDrive(boolean 
diskCacheManagerRequired, CloudProperties cloudProperties,
+            IIOManager ioManager) throws HyracksDataException {
+        if (diskCacheManagerRequired) {
+            double storagePercentage = 
cloudProperties.getStorageAllocationPercentage();
+            double pressureThreshold = 
cloudProperties.getStorageSweepThresholdPercentage();
+            long pressureDebugSize = 
cloudProperties.getStorageDebugSweepThresholdSize();
+            return new PhysicalDrive(ioManager.getIODevices(), 
pressureThreshold, storagePercentage, pressureDebugSize);
+        }
+
+        return DummyPhysicalDrive.INSTANCE;
+    }
+
+    private static IDiskResourceCacheLockNotifier createLockNotifier(boolean 
diskCacheManagerRequired) {
+        if (diskCacheManagerRequired) {
+            return new CloudDiskResourceCacheLockNotifier();
+        }
+
+        return NoOpDiskResourceCacheLockNotifier.INSTANCE;
+    }
+
+    private static IDiskCachedPageAllocator createPageAllocator(boolean 
diskCacheManagerRequired) {
+        if (diskCacheManagerRequired) {
+            return CloudDiskCachedPageAllocator.INSTANCE;
+        }
+        return DefaultDiskCachedPageAllocator.INSTANCE;
+    }
+
+    private static IBufferCacheReadContext 
createDefaultBufferCachePageOpContext(boolean diskCacheManagerRequired,
+            IPhysicalDrive drive) {
+        if (diskCacheManagerRequired) {
+            return new DefaultCloudReadContext(drive);
+        }
+
+        return DefaultBufferCacheReadContextProvider.DEFAULT;
+    }
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 612237a..140fa6a 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -68,11 +68,11 @@
     private ILazyAccessor accessor;

     public LazyCloudIOManager(IOManager ioManager, CloudProperties 
cloudProperties,
-            INamespacePathResolver nsPathResolver, boolean 
replaceableAccessor) throws HyracksDataException {
+            INamespacePathResolver nsPathResolver, boolean selective) throws 
HyracksDataException {
         super(ioManager, cloudProperties, nsPathResolver);
         accessor = new InitialCloudAccessor(cloudClient, bucket, 
localIoManager);
         puncher = HolePuncherProvider.get(this, cloudProperties, 
writeBufferProvider);
-        if (replaceableAccessor) {
+        if (selective) {
             replacer = InitialCloudAccessor.NO_OP_REPLACER;
         } else {
             replacer = () -> {
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
index 4ea6c46..406f2f3 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
@@ -18,17 +18,21 @@
  */
 package org.apache.asterix.cloud.lazy.filesystem;

+import java.io.IOException;
 import java.nio.ByteBuffer;

 import org.apache.asterix.cloud.AbstractCloudIOManager;
+import org.apache.asterix.cloud.CloudFileHandle;
 import org.apache.asterix.cloud.IWriteBufferProvider;
 import org.apache.asterix.common.cloud.CloudCachePolicy;
 import org.apache.asterix.common.config.CloudProperties;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.filesystem.FileSystemOperationDispatcherUtil;
 
 public final class HolePuncherProvider {
     private static final IHolePuncher UNSUPPORTED = 
HolePuncherProvider::unsupported;
+    private static final IHolePuncher LINUX = 
HolePuncherProvider::linuxPunchHole;

     private HolePuncherProvider() {
     }
@@ -39,13 +43,35 @@
             return UNSUPPORTED;
         }

-        return new DebugHolePuncher(cloudIOManager, bufferProvider);
+        if (FileSystemOperationDispatcherUtil.isLinux()) {
+            return LINUX;
+        } else if (cloudProperties.isStorageDebugModeEnabled()) {
+            // Running on debug mode on a non-Linux box
+            return new DebugHolePuncher(cloudIOManager, bufferProvider);
+        }
+
+        throw new UnsupportedOperationException(
+                "Hole puncher is not supported using " + 
FileSystemOperationDispatcherUtil.getOSName());
     }

     private static int unsupported(IFileHandle fileHandle, long offset, long 
length) {
         throw new UnsupportedOperationException("punchHole is not supported");
     }

+    private static int linuxPunchHole(IFileHandle fileHandle, long offset, 
long length) throws HyracksDataException {
+        CloudFileHandle cloudFileHandle = (CloudFileHandle) fileHandle;
+        int fileDescriptor = cloudFileHandle.getFileDescriptor();
+        int blockSize = cloudFileHandle.getBlockSize();
+        int freedSpace = 
FileSystemOperationDispatcherUtil.punchHole(fileDescriptor, offset, length, 
blockSize);
+        try {
+            cloudFileHandle.getFileChannel().force(false);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+
+        return freedSpace;
+    }
+
     private static final class DebugHolePuncher implements IHolePuncher {
         private final AbstractCloudIOManager cloudIOManager;
         private final IWriteBufferProvider bufferProvider;
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 8e6becf..888cea1 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -43,6 +43,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 import org.apache.hyracks.util.cache.ICacheManager;

@@ -152,4 +153,9 @@
      * @return the disk write rate limiter provider
      */
     IDiskWriteRateLimiterProvider getDiskWriteRateLimiterProvider();
+
+    /**
+     * @return disk cache service
+     */
+    IDiskCacheMonitoringService getDiskCacheService();
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index 4b7649a8..a5e73af 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -19,8 +19,12 @@
 package org.apache.asterix.common.config;

 import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
+import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE;
+import static 
org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNIT;
 import static 
org.apache.hyracks.control.common.config.OptionTypes.NONNEGATIVE_INTEGER;
+import static 
org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER;
 import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
+import static org.apache.hyracks.util.StorageUtil.StorageUnit.GIGABYTE;

 import java.util.concurrent.TimeUnit;

@@ -28,6 +32,7 @@
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.config.IOptionType;
 import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.util.StorageUtil;

 public class CloudProperties extends AbstractProperties {

@@ -43,6 +48,13 @@
         CLOUD_STORAGE_ENDPOINT(STRING, ""),
         CLOUD_STORAGE_ANONYMOUS_AUTH(BOOLEAN, false),
         CLOUD_STORAGE_CACHE_POLICY(STRING, "lazy"),
+        // 80% of the total disk space
+        CLOUD_STORAGE_ALLOCATION_PERCENTAGE(DOUBLE, 0.8d),
+        // 90% of the allocated space for storage (i.e., 90% of the 80% of the 
total disk space)
+        CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE(DOUBLE, 0.9d),
+        CLOUD_STORAGE_DEBUG_MODE_ENABLED(BOOLEAN, true),
+        CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE(LONG_BYTE_UNIT, 
StorageUtil.getLongSizeInBytes(0, GIGABYTE)),
+        CLOUD_STORAGE_DISK_MONITOR_INTERVAL(POSITIVE_INTEGER, 60),
         CLOUD_PROFILER_LOG_INTERVAL(NONNEGATIVE_INTEGER, 0);

         private final IOptionType interpreter;
@@ -63,6 +75,10 @@
                 case CLOUD_STORAGE_ENDPOINT:
                 case CLOUD_STORAGE_ANONYMOUS_AUTH:
                 case CLOUD_STORAGE_CACHE_POLICY:
+                case CLOUD_STORAGE_ALLOCATION_PERCENTAGE:
+                case CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE:
+                case CLOUD_STORAGE_DISK_MONITOR_INTERVAL:
+                case CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE:
                 case CLOUD_PROFILER_LOG_INTERVAL:
                     return Section.COMMON;
                 default:
@@ -86,9 +102,34 @@
                 case CLOUD_STORAGE_ANONYMOUS_AUTH:
                     return "Indicates whether or not anonymous auth should be 
used for the cloud storage";
                 case CLOUD_STORAGE_CACHE_POLICY:
-                    return "The caching policy (either eager or lazy). 'Eager' 
caching will download all partitions"
-                            + " upon booting, whereas lazy caching will 
download a file upon request to open it."
+                    return "The caching policy (either eager, lazy or 
selective). 'eager' caching will download"
+                            + "all partitions upon booting, whereas 'lazy' 
caching will download a file upon"
+                            + " request to open it. 'selective' caching will 
act as the 'lazy' policy; however, "
+                            + " it allows to use the local disk(s) as a cache, 
where pages and indexes can be "
+                            + " cached or evicted according to the pressure 
imposed on the local disks."
                             + " (default: 'lazy')";
+                case CLOUD_STORAGE_ALLOCATION_PERCENTAGE:
+                    return "The percentage of the total disk space that should 
be allocated for data storage when the"
+                            + " 'selective' caching policy is used. The 
remaining will act as a buffer for "
+                            + " query workspace (i.e., for query operations 
that require spilling to disk)."
+                            + " (default: 80% of the total disk space)";
+                case CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE:
+                    return "The percentage of the used storage space at which 
the disk sweeper starts freeing space by"
+                            + " punching holes in stored indexes or by 
evicting them entirely, "
+                            + " when the 'selective' caching policy is used."
+                            + " (default: 90% of the allocated space for 
storage)";
+                case CLOUD_STORAGE_DISK_MONITOR_INTERVAL:
+                    return "The disk monitoring interval time (in seconds): 
determines how often the system"
+                            + " checks for pressure on disk space when using 
the 'selective' caching policy."
+                            + " (default : 60 seconds)";
+                case CLOUD_STORAGE_DEBUG_MODE_ENABLED:
+                    return "Whether or not the debug mode is enabled when 
using the 'selective' caching policy."
+                            + "(default: false)";
+                case CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE:
+                    return "For debugging only. Pressure size will be the 
current used space + the additional bytes"
+                            + " provided by this configuration option instead 
of using "
+                            + " CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE."
+                            + " (default: 0. I.e., 
CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE will be used by default)";
                 case CLOUD_PROFILER_LOG_INTERVAL:
                     return "The waiting time (in minutes) to log cloud request 
statistics (default: 0, which means"
                             + " the profiler is disabled by default). The 
minimum is 1 minute."
@@ -134,6 +175,26 @@
         return accessor.getBoolean(Option.CLOUD_STORAGE_ANONYMOUS_AUTH);
     }

+    public double getStorageAllocationPercentage() {
+        return accessor.getDouble(Option.CLOUD_STORAGE_ALLOCATION_PERCENTAGE);
+    }
+
+    public double getStorageSweepThresholdPercentage() {
+        return 
accessor.getDouble(Option.CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE);
+    }
+
+    public int getStorageDiskMonitorInterval() {
+        return accessor.getInt(Option.CLOUD_STORAGE_DISK_MONITOR_INTERVAL);
+    }
+
+    public boolean isStorageDebugModeEnabled() {
+        return accessor.getBoolean(Option.CLOUD_STORAGE_DEBUG_MODE_ENABLED);
+    }
+
+    public long getStorageDebugSweepThresholdSize() {
+        return isStorageDebugModeEnabled() ? 
accessor.getLong(Option.CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE) : 0L;
+    }
+
     public CloudCachePolicy getCloudCachePolicy() {
         return 
CloudCachePolicy.fromName(accessor.getString(Option.CLOUD_STORAGE_CACHE_POLICY));
     }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 1a45155..09711fd 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -59,6 +59,7 @@
 import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
 import org.apache.hyracks.storage.common.buffercache.SleepRateLimiter;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;

@@ -71,6 +72,7 @@
     private final IVirtualBufferCache vbc;
     private final ILogManager logManager;
     private final LogRecord waitLog;
+    private final IDiskResourceCacheLockNotifier lockNotifier;
     private volatile boolean stopped = false;
     private final IIndexCheckpointManagerProvider 
indexCheckpointManagerProvider;
     // all LSM-trees share the same virtual buffer cache list
@@ -78,7 +80,8 @@

     public DatasetLifecycleManager(StorageProperties storageProperties, 
ILocalResourceRepository resourceRepository,
             ILogManager logManager, IVirtualBufferCache vbc,
-            IIndexCheckpointManagerProvider indexCheckpointManagerProvider, 
int numPartitions) {
+            IIndexCheckpointManagerProvider indexCheckpointManagerProvider,
+            IDiskResourceCacheLockNotifier lockNotifier) {
         this.logManager = logManager;
         this.storageProperties = storageProperties;
         this.resourceRepository = resourceRepository;
@@ -89,6 +92,7 @@
             vbcs.add(vbc);
         }
         this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
+        this.lockNotifier = lockNotifier;
         waitLog = new LogRecord();
         waitLog.setLogType(LogType.WAIT_FOR_FLUSHES);
         waitLog.computeAndSetLogSize();
@@ -122,6 +126,7 @@
             datasetResource = getDatasetLifecycle(did);
         }
         datasetResource.register(resource, (ILSMIndex) index);
+        lockNotifier.onRegister(did, resource, index);
     }

     private int getDIDfromResourcePath(String resourcePath) throws 
HyracksDataException {
@@ -145,7 +150,7 @@
         validateDatasetLifecycleManagerState();
         int did = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
-
+        lockNotifier.onUnregister(did, resourceID);
         DatasetResource dsr = datasets.get(did);
         IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);

@@ -190,6 +195,9 @@
         int did = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);

+        // Notify first before opening a resource
+        lockNotifier.onOpen(did, resourceID);
+
         DatasetResource dsr = datasets.get(did);
         DatasetInfo dsInfo = dsr.getDatasetInfo();
         if (dsInfo == null || !dsInfo.isRegistered()) {
@@ -253,6 +261,7 @@
             if (iInfo == null) {
                 throw 
HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, 
resourceID);
             }
+            lockNotifier.onClose(did, resourceID);
         } finally {
             // Regardless of what exception is thrown in the try-block (e.g., 
line 279),
             // we have to un-touch the index and dataset.
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index b9f277a..9909e97 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -45,6 +45,7 @@
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IDiskSpaceMaker;
 import org.apache.hyracks.api.io.IFileDeviceResolver;
 import org.apache.hyracks.api.io.IFileHandle;
 import org.apache.hyracks.api.io.IIOBulkOperation;
@@ -82,6 +83,8 @@
      * Mutables
      */
     private int workspaceIndex;
+    // TODO use space make on write
+    private IDiskSpaceMaker spaceMaker;

     public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver 
deviceComputer, int ioParallelism, int queueSize)
             throws HyracksDataException {
@@ -112,6 +115,7 @@
         for (int i = 0; i < numIoThreads; i++) {
             executor.execute(new IoRequestHandler(i, submittedRequests));
         }
+        spaceMaker = NoOpDiskSpaceMaker.INSTANCE;
     }

     public int getQueueSize() {
@@ -596,4 +600,8 @@
     public void performBulkOperation(IIOBulkOperation bulkOperation) throws 
HyracksDataException {
         ((AbstractBulkOperation) bulkOperation).performOperation();
     }
+
+    public void setSpaceMaker(IDiskSpaceMaker spaceMaker) {
+        this.spaceMaker = spaceMaker;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/NoOpDiskSpaceMaker.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/NoOpDiskSpaceMaker.java
new file mode 100644
index 0000000..1527dde
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/NoOpDiskSpaceMaker.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io;
+
+import java.io.IOException;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IDiskSpaceMaker;
+
+final class NoOpDiskSpaceMaker implements IDiskSpaceMaker {
+    static final IDiskSpaceMaker INSTANCE = new NoOpDiskSpaceMaker();
+
+    private NoOpDiskSpaceMaker() {
+    }
+
+    @Override
+    public void makeSpaceOrThrow(IOException e) throws HyracksDataException {
+        throw HyracksDataException.create(e);
+    }
+}

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18291
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ia212db8c1673f6b725eb21fe7035a1e20490d1f6
Gerrit-Change-Number: 18291
Gerrit-PatchSet: 1
Gerrit-Owner: Wail Alkowaileet <[email protected]>
Gerrit-MessageType: newchange

Reply via email to