>From Wail Alkowaileet <[email protected]>:
Wail Alkowaileet has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18291 )
Change subject: [ASTERIXDB-3387][STO] Enable Selective caching policy
......................................................................
[ASTERIXDB-3387][STO] Enable Selective caching policy
- user model changes: yes
- storage format changes: no
- interface changes: no
Details:
Allow user to enable/select Selective caching policy
Change-Id: Ia212db8c1673f6b725eb21fe7035a1e20490d1f6
---
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
A
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
A
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/NoOpDiskSpaceMaker.java
9 files changed, 383 insertions(+), 16 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/91/18291/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index cd52b9d..1d6a0d8 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -33,7 +33,7 @@
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.cloud.CloudManagerProvider;
+import org.apache.asterix.cloud.CloudConfigurator;
import org.apache.asterix.cloud.LocalPartitionBootstrapper;
import org.apache.asterix.common.api.IConfigValidator;
import org.apache.asterix.common.api.IConfigValidatorFactory;
@@ -122,6 +122,10 @@
import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
import
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
import
org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
+import
org.apache.hyracks.storage.common.disk.NoOpDiskResourceCacheLockNotifier;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
import org.apache.hyracks.storage.common.file.FileMapManager;
import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
@@ -180,6 +184,7 @@
private IPartitionBootstrapper partitionBootstrapper;
private final INamespacePathResolver namespacePathResolver;
private final INamespaceResolver namespaceResolver;
+ private IDiskCacheMonitoringService diskCacheService;
public NCAppRuntimeContext(INCServiceContext ncServiceContext,
NCExtensionManager extensionManager,
IPropertiesFactory propertiesFactory, INamespaceResolver
namespaceResolver,
@@ -211,16 +216,26 @@
IConfigValidatorFactory configValidatorFactory,
IReplicationStrategyFactory replicationStrategyFactory,
boolean initialRun) throws IOException {
ioManager = getServiceContext().getIoManager();
- IDiskCachedPageAllocator pageAllocator =
DefaultDiskCachedPageAllocator.INSTANCE;
- IBufferCacheReadContext defaultContext =
DefaultBufferCacheReadContextProvider.DEFAULT;
+ CloudConfigurator cloudConfigurator;
+ IDiskResourceCacheLockNotifier lockNotifier;
+ IDiskCachedPageAllocator pageAllocator;
+ IBufferCacheReadContext defaultContext;
if (isCloudDeployment()) {
- persistenceIOManager =
- CloudManagerProvider.createIOManager(cloudProperties,
ioManager, namespacePathResolver);
- partitionBootstrapper =
CloudManagerProvider.getCloudPartitionBootstrapper(persistenceIOManager);
+ cloudConfigurator = CloudConfigurator.of(cloudProperties,
ioManager, namespacePathResolver);
+ persistenceIOManager = cloudConfigurator.getCloudIoManager();
+ partitionBootstrapper =
cloudConfigurator.getPartitionBootstrapper();
+ lockNotifier = cloudConfigurator.getLockNotifier();
+ pageAllocator = cloudConfigurator.getPageAllocator();
+ defaultContext = cloudConfigurator.getDefaultContext();
} else {
+ cloudConfigurator = null;
persistenceIOManager = ioManager;
partitionBootstrapper = new LocalPartitionBootstrapper(ioManager);
+ lockNotifier = NoOpDiskResourceCacheLockNotifier.INSTANCE;
+ pageAllocator = DefaultDiskCachedPageAllocator.INSTANCE;
+ defaultContext = DefaultBufferCacheReadContextProvider.DEFAULT;
}
+
int ioQueueLen =
getServiceContext().getAppConfig().getInt(NCConfig.Option.IO_QUEUE_SIZE);
threadExecutor =
MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
@@ -261,9 +276,8 @@
// Must start vbc now instead of by life cycle component manager
(lccm) because lccm happens after
// the metadata bootstrap task
((ILifeCycleComponent) virtualBufferCache).start();
- datasetLifecycleManager =
- new DatasetLifecycleManager(storageProperties,
localResourceRepository, txnSubsystem.getLogManager(),
- virtualBufferCache, indexCheckpointManagerProvider,
ioManager.getIODevices().size());
+ datasetLifecycleManager = new
DatasetLifecycleManager(storageProperties, localResourceRepository,
+ txnSubsystem.getLogManager(), virtualBufferCache,
indexCheckpointManagerProvider, lockNotifier);
localResourceRepository.setDatasetLifecycleManager(datasetLifecycleManager);
final String nodeId = getServiceContext().getNodeId();
final Set<Integer> nodePartitions =
metadataProperties.getNodePartitions(nodeId);
@@ -300,6 +314,15 @@
fileInfoMap, defaultContext);
}
+ if (cloudConfigurator != null) {
+ diskCacheService =
+
cloudConfigurator.createDiskCacheMonitoringService(getServiceContext(),
bufferCache, fileInfoMap);
+ } else {
+ diskCacheService = NoOpDiskCacheMonitoringService.INSTANCE;
+ }
+
+ diskCacheService.start();
+
NodeControllerService ncs = (NodeControllerService)
getServiceContext().getControllerService();
FileReference appDir =
ioManager.resolveAbsolutePath(getServiceContext().getServerCtx().getAppDir().getAbsolutePath());
@@ -351,6 +374,7 @@
@Override
public synchronized void preStop() throws Exception {
activeManager.shutdown();
+ diskCacheService.stop();
if (metadataNodeStub != null) {
unexportMetadataNodeStub();
}
@@ -695,6 +719,11 @@
}
@Override
+ public IDiskCacheMonitoringService getDiskCacheService() {
+ return diskCacheService;
+ }
+
+ @Override
public boolean isCloudDeployment() {
return ncServiceContext.getAppConfig().getBoolean(CLOUD_DEPLOYMENT);
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
new file mode 100644
index 0000000..3ee59d2
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.asterix.common.api.INamespacePathResolver;
+import org.apache.asterix.common.cloud.CloudCachePolicy;
+import org.apache.asterix.common.cloud.IPartitionBootstrapper;
+import org.apache.asterix.common.config.CloudProperties;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadContext;
+import org.apache.hyracks.cloud.buffercache.page.CloudDiskCachedPageAllocator;
+import
org.apache.hyracks.cloud.cache.service.CloudDiskCacheMonitoringAndPrefetchingService;
+import
org.apache.hyracks.cloud.cache.service.CloudDiskResourceCacheLockNotifier;
+import org.apache.hyracks.cloud.cache.service.DiskCacheSweeperThread;
+import org.apache.hyracks.cloud.filesystem.PhysicalDrive;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import
org.apache.hyracks.storage.common.buffercache.DefaultDiskCachedPageAllocator;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IDiskCachedPageAllocator;
+import
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import
org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider;
+import org.apache.hyracks.storage.common.disk.DummyPhysicalDrive;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
+import
org.apache.hyracks.storage.common.disk.NoOpDiskResourceCacheLockNotifier;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
+public final class CloudConfigurator {
+ private final IOManager localIoManager;
+ private final AbstractCloudIOManager cloudIOManager;
+ private final IPhysicalDrive physicalDrive;
+ private final IDiskResourceCacheLockNotifier lockNotifier;
+ private final IDiskCachedPageAllocator pageAllocator;
+ private final IBufferCacheReadContext defaultContext;
+ private final boolean diskCacheManagerRequired;
+ private final long diskCacheMonitoringInterval;
+
+ private CloudConfigurator(CloudProperties cloudProperties, IIOManager
ioManager,
+ INamespacePathResolver nsPathResolver) throws HyracksDataException
{
+ localIoManager = (IOManager) ioManager;
+ diskCacheManagerRequired = cloudProperties.getCloudCachePolicy() ==
CloudCachePolicy.SELECTIVE;
+ cloudIOManager = createIOManager(ioManager, cloudProperties,
nsPathResolver);
+ physicalDrive = createPhysicalDrive(diskCacheManagerRequired,
cloudProperties, ioManager);
+ lockNotifier = createLockNotifier(diskCacheManagerRequired);
+ pageAllocator = createPageAllocator(diskCacheManagerRequired);
+ defaultContext =
createDefaultBufferCachePageOpContext(diskCacheManagerRequired, physicalDrive);
+ diskCacheMonitoringInterval =
cloudProperties.getStorageDiskMonitorInterval();
+ }
+
+ public IPartitionBootstrapper getPartitionBootstrapper() {
+ return cloudIOManager;
+ }
+
+ public IIOManager getCloudIoManager() {
+ return cloudIOManager;
+ }
+
+ public IDiskResourceCacheLockNotifier getLockNotifier() {
+ return lockNotifier;
+ }
+
+ public IDiskCachedPageAllocator getPageAllocator() {
+ return pageAllocator;
+ }
+
+ public IBufferCacheReadContext getDefaultContext() {
+ return defaultContext;
+ }
+
+ public IDiskCacheMonitoringService
createDiskCacheMonitoringService(INCServiceContext serviceContext,
+ IBufferCache bufferCache, Map<Integer, BufferedFileHandle>
fileInfoMap) {
+ if (!diskCacheManagerRequired) {
+ return NoOpDiskCacheMonitoringService.INSTANCE;
+ }
+
+ CloudDiskResourceCacheLockNotifier resourceCacheManager =
(CloudDiskResourceCacheLockNotifier) lockNotifier;
+ BufferCache diskBufferCache = (BufferCache) bufferCache;
+ int numOfIoDevices = localIoManager.getIODevices().size();
+ IApplicationConfig appConfig = serviceContext.getAppConfig();
+ int ioParallelism =
appConfig.getInt(NCConfig.Option.IO_WORKERS_PER_PARTITION);
+ int sweepQueueSize = appConfig.getInt(NCConfig.Option.IO_QUEUE_SIZE);
+ int numOfSweepThreads = ioParallelism * numOfIoDevices;
+ // Ensure at least each sweep thread has one entry in the queue
+ int maxSweepQueueSize = Math.max(numOfSweepThreads, sweepQueueSize);
+ // +1 for the monitorThread
+ ExecutorService executor =
Executors.newFixedThreadPool(numOfSweepThreads + 1);
+ DiskCacheSweeperThread monitorThread =
+ new DiskCacheSweeperThread(executor,
diskCacheMonitoringInterval, resourceCacheManager, cloudIOManager,
+ numOfSweepThreads, maxSweepQueueSize, physicalDrive,
diskBufferCache, fileInfoMap);
+
+ IDiskCacheMonitoringService diskCacheService =
+ new CloudDiskCacheMonitoringAndPrefetchingService(executor,
physicalDrive, monitorThread);
+ localIoManager.setSpaceMaker(monitorThread);
+ return diskCacheService;
+ }
+
+ public static CloudConfigurator of(CloudProperties cloudProperties,
IIOManager ioManager,
+ INamespacePathResolver nsPathResolver) throws HyracksDataException
{
+ return new CloudConfigurator(cloudProperties, ioManager,
nsPathResolver);
+ }
+
+ public static AbstractCloudIOManager createIOManager(IIOManager ioManager,
CloudProperties cloudProperties,
+ INamespacePathResolver nsPathResolver) throws HyracksDataException
{
+ IOManager localIoManager = (IOManager) ioManager;
+ CloudCachePolicy policy = cloudProperties.getCloudCachePolicy();
+ if (policy == CloudCachePolicy.EAGER) {
+ return new EagerCloudIOManager(localIoManager, cloudProperties,
nsPathResolver);
+ }
+
+ boolean selective = policy == CloudCachePolicy.SELECTIVE;
+ return new LazyCloudIOManager(localIoManager, cloudProperties,
nsPathResolver, selective);
+ }
+
+ private static IPhysicalDrive createPhysicalDrive(boolean
diskCacheManagerRequired, CloudProperties cloudProperties,
+ IIOManager ioManager) throws HyracksDataException {
+ if (diskCacheManagerRequired) {
+ double storagePercentage =
cloudProperties.getStorageAllocationPercentage();
+ double pressureThreshold =
cloudProperties.getStorageSweepThresholdPercentage();
+ long pressureDebugSize =
cloudProperties.getStorageDebugSweepThresholdSize();
+ return new PhysicalDrive(ioManager.getIODevices(),
pressureThreshold, storagePercentage, pressureDebugSize);
+ }
+
+ return DummyPhysicalDrive.INSTANCE;
+ }
+
+ private static IDiskResourceCacheLockNotifier createLockNotifier(boolean
diskCacheManagerRequired) {
+ if (diskCacheManagerRequired) {
+ return new CloudDiskResourceCacheLockNotifier();
+ }
+
+ return NoOpDiskResourceCacheLockNotifier.INSTANCE;
+ }
+
+ private static IDiskCachedPageAllocator createPageAllocator(boolean
diskCacheManagerRequired) {
+ if (diskCacheManagerRequired) {
+ return CloudDiskCachedPageAllocator.INSTANCE;
+ }
+ return DefaultDiskCachedPageAllocator.INSTANCE;
+ }
+
+ private static IBufferCacheReadContext
createDefaultBufferCachePageOpContext(boolean diskCacheManagerRequired,
+ IPhysicalDrive drive) {
+ if (diskCacheManagerRequired) {
+ return new DefaultCloudReadContext(drive);
+ }
+
+ return DefaultBufferCacheReadContextProvider.DEFAULT;
+ }
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 612237a..140fa6a 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -68,11 +68,11 @@
private ILazyAccessor accessor;
public LazyCloudIOManager(IOManager ioManager, CloudProperties
cloudProperties,
- INamespacePathResolver nsPathResolver, boolean
replaceableAccessor) throws HyracksDataException {
+ INamespacePathResolver nsPathResolver, boolean selective) throws
HyracksDataException {
super(ioManager, cloudProperties, nsPathResolver);
accessor = new InitialCloudAccessor(cloudClient, bucket,
localIoManager);
puncher = HolePuncherProvider.get(this, cloudProperties,
writeBufferProvider);
- if (replaceableAccessor) {
+ if (selective) {
replacer = InitialCloudAccessor.NO_OP_REPLACER;
} else {
replacer = () -> {
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
index 4ea6c46..406f2f3 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
@@ -18,17 +18,21 @@
*/
package org.apache.asterix.cloud.lazy.filesystem;
+import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.asterix.cloud.AbstractCloudIOManager;
+import org.apache.asterix.cloud.CloudFileHandle;
import org.apache.asterix.cloud.IWriteBufferProvider;
import org.apache.asterix.common.cloud.CloudCachePolicy;
import org.apache.asterix.common.config.CloudProperties;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.filesystem.FileSystemOperationDispatcherUtil;
public final class HolePuncherProvider {
private static final IHolePuncher UNSUPPORTED =
HolePuncherProvider::unsupported;
+ private static final IHolePuncher LINUX =
HolePuncherProvider::linuxPunchHole;
private HolePuncherProvider() {
}
@@ -39,13 +43,35 @@
return UNSUPPORTED;
}
- return new DebugHolePuncher(cloudIOManager, bufferProvider);
+ if (FileSystemOperationDispatcherUtil.isLinux()) {
+ return LINUX;
+ } else if (cloudProperties.isStorageDebugModeEnabled()) {
+ // Running on debug mode on a non-Linux box
+ return new DebugHolePuncher(cloudIOManager, bufferProvider);
+ }
+
+ throw new UnsupportedOperationException(
+ "Hole puncher is not supported using " +
FileSystemOperationDispatcherUtil.getOSName());
}
private static int unsupported(IFileHandle fileHandle, long offset, long
length) {
throw new UnsupportedOperationException("punchHole is not supported");
}
+ private static int linuxPunchHole(IFileHandle fileHandle, long offset,
long length) throws HyracksDataException {
+ CloudFileHandle cloudFileHandle = (CloudFileHandle) fileHandle;
+ int fileDescriptor = cloudFileHandle.getFileDescriptor();
+ int blockSize = cloudFileHandle.getBlockSize();
+ int freedSpace =
FileSystemOperationDispatcherUtil.punchHole(fileDescriptor, offset, length,
blockSize);
+ try {
+ cloudFileHandle.getFileChannel().force(false);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+
+ return freedSpace;
+ }
+
private static final class DebugHolePuncher implements IHolePuncher {
private final AbstractCloudIOManager cloudIOManager;
private final IWriteBufferProvider bufferProvider;
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 8e6becf..888cea1 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -43,6 +43,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
import org.apache.hyracks.storage.common.file.IResourceIdFactory;
import org.apache.hyracks.util.cache.ICacheManager;
@@ -152,4 +153,9 @@
* @return the disk write rate limiter provider
*/
IDiskWriteRateLimiterProvider getDiskWriteRateLimiterProvider();
+
+ /**
+ * @return disk cache service
+ */
+ IDiskCacheMonitoringService getDiskCacheService();
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index 4b7649a8..a5e73af 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -19,8 +19,12 @@
package org.apache.asterix.common.config;
import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
+import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE;
+import static
org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNIT;
import static
org.apache.hyracks.control.common.config.OptionTypes.NONNEGATIVE_INTEGER;
+import static
org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER;
import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
+import static org.apache.hyracks.util.StorageUtil.StorageUnit.GIGABYTE;
import java.util.concurrent.TimeUnit;
@@ -28,6 +32,7 @@
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.config.IOptionType;
import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.util.StorageUtil;
public class CloudProperties extends AbstractProperties {
@@ -43,6 +48,13 @@
CLOUD_STORAGE_ENDPOINT(STRING, ""),
CLOUD_STORAGE_ANONYMOUS_AUTH(BOOLEAN, false),
CLOUD_STORAGE_CACHE_POLICY(STRING, "lazy"),
+ // 80% of the total disk space
+ CLOUD_STORAGE_ALLOCATION_PERCENTAGE(DOUBLE, 0.8d),
+ // 90% of the allocated space for storage (i.e., 90% of the 80% of the
total disk space)
+ CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE(DOUBLE, 0.9d),
+ CLOUD_STORAGE_DEBUG_MODE_ENABLED(BOOLEAN, true),
+ CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE(LONG_BYTE_UNIT,
StorageUtil.getLongSizeInBytes(0, GIGABYTE)),
+ CLOUD_STORAGE_DISK_MONITOR_INTERVAL(POSITIVE_INTEGER, 60),
CLOUD_PROFILER_LOG_INTERVAL(NONNEGATIVE_INTEGER, 0);
private final IOptionType interpreter;
@@ -63,6 +75,10 @@
case CLOUD_STORAGE_ENDPOINT:
case CLOUD_STORAGE_ANONYMOUS_AUTH:
case CLOUD_STORAGE_CACHE_POLICY:
+ case CLOUD_STORAGE_ALLOCATION_PERCENTAGE:
+ case CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE:
+ case CLOUD_STORAGE_DISK_MONITOR_INTERVAL:
+ case CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE:
case CLOUD_PROFILER_LOG_INTERVAL:
return Section.COMMON;
default:
@@ -86,9 +102,34 @@
case CLOUD_STORAGE_ANONYMOUS_AUTH:
return "Indicates whether or not anonymous auth should be
used for the cloud storage";
case CLOUD_STORAGE_CACHE_POLICY:
- return "The caching policy (either eager or lazy). 'Eager'
caching will download all partitions"
- + " upon booting, whereas lazy caching will
download a file upon request to open it."
+ return "The caching policy (either eager, lazy or
selective). 'eager' caching will download"
+ + "all partitions upon booting, whereas 'lazy'
caching will download a file upon"
+ + " request to open it. 'selective' caching will
act as the 'lazy' policy; however, "
+ + " it allows to use the local disk(s) as a cache,
where pages and indexes can be "
+ + " cached or evicted according to the pressure
imposed on the local disks."
+ " (default: 'lazy')";
+ case CLOUD_STORAGE_ALLOCATION_PERCENTAGE:
+ return "The percentage of the total disk space that should
be allocated for data storage when the"
+ + " 'selective' caching policy is used. The
remaining will act as a buffer for "
+ + " query workspace (i.e., for query operations
that require spilling to disk)."
+ + " (default: 80% of the total disk space)";
+ case CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE:
+ return "The percentage of the used storage space at which
the disk sweeper starts freeing space by"
+ + " punching holes in stored indexes or by
evicting them entirely, "
+ + " when the 'selective' caching policy is used."
+ + " (default: 90% of the allocated space for
storage)";
+ case CLOUD_STORAGE_DISK_MONITOR_INTERVAL:
+ return "The disk monitoring interval time (in seconds):
determines how often the system"
+ + " checks for pressure on disk space when using
the 'selective' caching policy."
+ + " (default : 60 seconds)";
+ case CLOUD_STORAGE_DEBUG_MODE_ENABLED:
+ return "Whether or not the debug mode is enabled when
using the 'selective' caching policy."
+ + "(default: false)";
+ case CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE:
+ return "For debugging only. Pressure size will be the
current used space + the additional bytes"
+ + " provided by this configuration option instead
of using "
+ + " CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE."
+ + " (default: 0. I.e.,
CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE will be used by default)";
case CLOUD_PROFILER_LOG_INTERVAL:
return "The waiting time (in minutes) to log cloud request
statistics (default: 0, which means"
+ " the profiler is disabled by default). The
minimum is 1 minute."
@@ -134,6 +175,26 @@
return accessor.getBoolean(Option.CLOUD_STORAGE_ANONYMOUS_AUTH);
}
+ public double getStorageAllocationPercentage() {
+ return accessor.getDouble(Option.CLOUD_STORAGE_ALLOCATION_PERCENTAGE);
+ }
+
+ public double getStorageSweepThresholdPercentage() {
+ return
accessor.getDouble(Option.CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE);
+ }
+
+ public int getStorageDiskMonitorInterval() {
+ return accessor.getInt(Option.CLOUD_STORAGE_DISK_MONITOR_INTERVAL);
+ }
+
+ public boolean isStorageDebugModeEnabled() {
+ return accessor.getBoolean(Option.CLOUD_STORAGE_DEBUG_MODE_ENABLED);
+ }
+
+ public long getStorageDebugSweepThresholdSize() {
+ return isStorageDebugModeEnabled() ?
accessor.getLong(Option.CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE) : 0L;
+ }
+
public CloudCachePolicy getCloudCachePolicy() {
return
CloudCachePolicy.fromName(accessor.getString(Option.CLOUD_STORAGE_CACHE_POLICY));
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 1a45155..09711fd 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -59,6 +59,7 @@
import org.apache.hyracks.storage.common.LocalResource;
import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
import org.apache.hyracks.storage.common.buffercache.SleepRateLimiter;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -71,6 +72,7 @@
private final IVirtualBufferCache vbc;
private final ILogManager logManager;
private final LogRecord waitLog;
+ private final IDiskResourceCacheLockNotifier lockNotifier;
private volatile boolean stopped = false;
private final IIndexCheckpointManagerProvider
indexCheckpointManagerProvider;
// all LSM-trees share the same virtual buffer cache list
@@ -78,7 +80,8 @@
public DatasetLifecycleManager(StorageProperties storageProperties,
ILocalResourceRepository resourceRepository,
ILogManager logManager, IVirtualBufferCache vbc,
- IIndexCheckpointManagerProvider indexCheckpointManagerProvider,
int numPartitions) {
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider,
+ IDiskResourceCacheLockNotifier lockNotifier) {
this.logManager = logManager;
this.storageProperties = storageProperties;
this.resourceRepository = resourceRepository;
@@ -89,6 +92,7 @@
vbcs.add(vbc);
}
this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
+ this.lockNotifier = lockNotifier;
waitLog = new LogRecord();
waitLog.setLogType(LogType.WAIT_FOR_FLUSHES);
waitLog.computeAndSetLogSize();
@@ -122,6 +126,7 @@
datasetResource = getDatasetLifecycle(did);
}
datasetResource.register(resource, (ILSMIndex) index);
+ lockNotifier.onRegister(did, resource, index);
}
private int getDIDfromResourcePath(String resourcePath) throws
HyracksDataException {
@@ -145,7 +150,7 @@
validateDatasetLifecycleManagerState();
int did = getDIDfromResourcePath(resourcePath);
long resourceID = getResourceIDfromResourcePath(resourcePath);
-
+ lockNotifier.onUnregister(did, resourceID);
DatasetResource dsr = datasets.get(did);
IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
@@ -190,6 +195,9 @@
int did = getDIDfromResourcePath(resourcePath);
long resourceID = getResourceIDfromResourcePath(resourcePath);
+ // Notify first before opening a resource
+ lockNotifier.onOpen(did, resourceID);
+
DatasetResource dsr = datasets.get(did);
DatasetInfo dsInfo = dsr.getDatasetInfo();
if (dsInfo == null || !dsInfo.isRegistered()) {
@@ -253,6 +261,7 @@
if (iInfo == null) {
throw
HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID,
resourceID);
}
+ lockNotifier.onClose(did, resourceID);
} finally {
// Regardless of what exception is thrown in the try-block (e.g.,
line 279),
// we have to un-touch the index and dataset.
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index b9f277a..9909e97 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -45,6 +45,7 @@
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IDiskSpaceMaker;
import org.apache.hyracks.api.io.IFileDeviceResolver;
import org.apache.hyracks.api.io.IFileHandle;
import org.apache.hyracks.api.io.IIOBulkOperation;
@@ -82,6 +83,8 @@
* Mutables
*/
private int workspaceIndex;
+ // TODO use space make on write
+ private IDiskSpaceMaker spaceMaker;
public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver
deviceComputer, int ioParallelism, int queueSize)
throws HyracksDataException {
@@ -112,6 +115,7 @@
for (int i = 0; i < numIoThreads; i++) {
executor.execute(new IoRequestHandler(i, submittedRequests));
}
+ spaceMaker = NoOpDiskSpaceMaker.INSTANCE;
}
public int getQueueSize() {
@@ -596,4 +600,8 @@
public void performBulkOperation(IIOBulkOperation bulkOperation) throws
HyracksDataException {
((AbstractBulkOperation) bulkOperation).performOperation();
}
+
+ public void setSpaceMaker(IDiskSpaceMaker spaceMaker) {
+ this.spaceMaker = spaceMaker;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/NoOpDiskSpaceMaker.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/NoOpDiskSpaceMaker.java
new file mode 100644
index 0000000..1527dde
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/NoOpDiskSpaceMaker.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io;
+
+import java.io.IOException;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IDiskSpaceMaker;
+
+final class NoOpDiskSpaceMaker implements IDiskSpaceMaker {
+ static final IDiskSpaceMaker INSTANCE = new NoOpDiskSpaceMaker();
+
+ private NoOpDiskSpaceMaker() {
+ }
+
+ @Override
+ public void makeSpaceOrThrow(IOException e) throws HyracksDataException {
+ throw HyracksDataException.create(e);
+ }
+}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18291
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ia212db8c1673f6b725eb21fe7035a1e20490d1f6
Gerrit-Change-Number: 18291
Gerrit-PatchSet: 1
Gerrit-Owner: Wail Alkowaileet <[email protected]>
Gerrit-MessageType: newchange