Murtadha Hubail has submitted this change and it was merged. Change subject: [ASTERIXDB-1995][STO] Abort write txn when index cannot be flushed ......................................................................
[ASTERIXDB-1995][STO] Abort write txn when index cannot be flushed - user model changes: no - storage format changes: no - interface changes: no Details: - Fix LSM memory component state transition on flush/merge failure - When index cannot be flushed, abort waiting threads - Prevent NPE in MateralizerTaskState when file creation fails - Check parent dirs creation for index metadata file Change-Id: I28592c30c788f4a6f44db8b47a84bc77f6b3f8f3 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1896 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java M hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java M hyracks-fullstack/hyracks/hyracks-util/pom.xml A hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/DiskUtil.java M hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java 11 files changed, 392 insertions(+), 24 deletions(-) Approvals: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; No violations found; No violations found; Verified diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java index dea5259..2799765 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java @@ -64,6 +64,8 @@ public NodeControllerService[] ncs = new NodeControllerService[0]; public IHyracksClientConnection hcc; + private static final String DEFAULT_STORAGE_PATH = joinPath("target", "io", "dir"); + private static String storagePath = DEFAULT_STORAGE_PATH; private ConfigManager configManager; private List<String> nodeNames; @@ -217,8 +219,16 @@ } } + public static void setStoragePath(String path) { + storagePath = path; + } + + public static void restoreDefaultStoragePath() { + storagePath = DEFAULT_STORAGE_PATH; + } + protected String getDefaultStoragePath() { - return joinPath("target", "io", "dir"); + return storagePath; } public void removeTestStorageFiles() { diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java new file mode 100644 index 0000000..58697a9 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.storage; + +import static org.apache.hyracks.util.StorageUtil.StorageUnit.MEGABYTE; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil; +import org.apache.asterix.app.bootstrap.TestNodeController; +import org.apache.asterix.app.data.gen.TupleGenerator; +import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction; +import org.apache.asterix.common.config.DatasetConfig.DatasetType; +import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable; +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.transactions.ITransactionContext; +import org.apache.asterix.external.util.DataflowUtils; +import org.apache.asterix.file.StorageComponentProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.Index; +import org.apache.asterix.metadata.entities.InternalDatasetDetails; +import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.test.common.TestHelper; +import org.apache.commons.lang3.SystemUtils; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory; +import org.apache.hyracks.util.DiskUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class DiskIsFullTest { + + private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 }; + private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" }, + new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false); + private static final GenerationFunction[] RECORD_GEN_FUNCTION = + { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC }; + private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false }; + private static final ARecordType META_TYPE = null; + private static final GenerationFunction[] META_GEN_FUNCTION = null; + private static final boolean[] UNIQUE_META_FIELDS = null; + private static final int[] KEY_INDEXES = { 0 }; + private static final int[] KEY_INDICATOR = { Index.RECORD_INDICATOR }; + private static final List<Integer> KEY_INDICATOR_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR }); + private static final int DATASET_ID = 101; + private static final String DATAVERSE_NAME = "TestDV"; + private static final String DATASET_NAME = "TestDS"; + private static final String DATA_TYPE_NAME = "DUMMY"; + private static final String NODE_GROUP_NAME = "DEFAULT"; + private static final String TEST_DISK_NAME = "asterixdb_ram_disk"; + private boolean shouldRun = true; + + @Before + public void setUp() throws Exception { + if (!SystemUtils.IS_OS_MAC) { + System.out.println("Skipping test " + DiskIsFullTest.class.getName() + " due to unsupported OS"); + shouldRun = false; + return; + } + System.out.println("SetUp: "); + TestHelper.deleteExistingInstanceFiles(); + // create RAM disk + final Path ramDiskRoot = DiskUtil.mountRamDisk(TEST_DISK_NAME, 4, MEGABYTE); + // Use RAM disk for storage + AsterixHyracksIntegrationUtil.setStoragePath(ramDiskRoot.toAbsolutePath().toString()); + } + + @After + public void tearDown() throws Exception { + if (!shouldRun) { + return; + } + System.out.println("TearDown"); + TestHelper.deleteExistingInstanceFiles(); + DiskUtil.unmountRamDisk(TEST_DISK_NAME); + AsterixHyracksIntegrationUtil.restoreDefaultStoragePath(); + } + + @Test + public void testDiskIsFull() { + if (!shouldRun) { + return; + } + HyracksDataException expectedException = + HyracksDataException.create(ErrorCode.CANNOT_MODIFY_INDEX_DISK_IS_FULL); + try { + TestNodeController nc = new TestNodeController(null, false); + nc.init(); + StorageComponentProvider storageManager = new StorageComponentProvider(); + List<List<String>> partitioningKeys = new ArrayList<>(); + partitioningKeys.add(Collections.singletonList("key")); + Dataset dataset = + new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME, null, + null, + new InternalDatasetDetails(null, PartitioningStrategy.HASH, partitioningKeys, null, null, + null, false, null, false), null, DatasetType.INTERNAL, DATASET_ID, 0); + try { + nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, + null, storageManager, KEY_INDEXES, KEY_INDICATOR_LIST); + IHyracksTaskContext ctx = nc.createTestContext(false); + nc.newJobId(); + ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true); + // Prepare insert operation + LSMInsertDeleteOperatorNodePushable insertOp = + nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, + new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager) + .getLeft(); + insertOp.open(); + TupleGenerator tupleGenerator = + new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR, RECORD_GEN_FUNCTION, + UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); + VSizeFrame frame = new VSizeFrame(ctx); + FrameTupleAppender tupleAppender = new FrameTupleAppender(frame); + // Insert records until disk becomes full + int tupleCount = 100000; + while (tupleCount > 0) { + ITupleReference tuple = tupleGenerator.next(); + try { + DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp); + } catch (Throwable t) { + final Throwable rootCause = ExceptionUtils.getRootCause(t); + rootCause.printStackTrace(); + if (rootCause instanceof HyracksDataException) { + HyracksDataException cause = (HyracksDataException) rootCause; + Assert.assertEquals(cause.getErrorCode(), expectedException.getErrorCode()); + Assert.assertEquals(cause.getMessage(), expectedException.getMessage()); + return; + } else { + break; + } + } + tupleCount--; + } + Assert.fail("Expected exception (" + expectedException + ") was not thrown"); + } finally { + nc.deInit(); + } + } catch (Throwable e) { + e.printStackTrace(); + Assert.fail("Expected exception (" + expectedException + ") was not thrown"); + } + } +} \ No newline at end of file diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index e530bc3..b117cf1 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.transaction.management.resource; +import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE; + import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -190,10 +192,12 @@ FileReference resourceFile = ioManager.resolve(relativePath); if (resourceFile.getFile().exists()) { throw new HyracksDataException("Duplicate resource: " + resourceFile.getAbsolutePath()); - } else { - resourceFile.getFile().getParentFile().mkdirs(); } - resourceCache.put(resource.getPath(), resource); + + final File parent = resourceFile.getFile().getParentFile(); + if (!parent.exists() && !parent.mkdirs()) { + throw HyracksDataException.create(CANNOT_CREATE_FILE, parent.getAbsolutePath()); + } try (FileOutputStream fos = new FileOutputStream(resourceFile.getFile()); ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) { @@ -203,6 +207,8 @@ throw new HyracksDataException(e); } + resourceCache.put(resource.getPath(), resource); + //if replication enabled, send resource metadata info to remote nodes if (isReplicationEnabled) { createReplicationJob(ReplicationOperation.REPLICATE, resourceFile); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index 7dbade2..e6fbc6f 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -121,6 +121,7 @@ public static final int FOUND_MULTIPLE_TRANSACTIONS = 85; public static final int UNRECOGNIZED_INDEX_COMPONENT_FILE = 86; public static final int UNEQUAL_NUM_FILTERS_TREES = 87; + public static final int CANNOT_MODIFY_INDEX_DISK_IS_FULL = 88; // Compilation error codes. public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index cd38917..d2e05e3 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -104,5 +104,6 @@ 85 = Found more than one transaction file in %1$s 86 = Found an unrecognized index file %1$s 87 = Unequal number of trees and filters found in %1$s +88 = Cannot modify index (Disk is full) 10000 = The given rule collection %1$s is not an instance of the List class. diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java index 918155d..31cbaad 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java @@ -66,7 +66,9 @@ } public void close() throws HyracksDataException { - out.close(); + if (out != null) { + out.close(); + } } public void appendFrame(ByteBuffer buffer) throws HyracksDataException { @@ -74,20 +76,25 @@ } public void writeOut(IFrameWriter writer, IFrame frame, boolean failed) throws HyracksDataException { - RunFileReader in = out.createReader(); + RunFileReader in = null; + if (out != null) { + in = out.createReader(); + } writer.open(); try { if (failed) { writer.fail(); return; } - in.open(); - try { - while (in.nextFrame(frame)) { - writer.nextFrame(frame.getBuffer()); + if (in != null) { + in.open(); + try { + while (in.nextFrame(frame)) { + writer.nextFrame(frame.getBuffer()); + } + } finally { + in.close(); } - } finally { - in.close(); } } catch (Exception e) { writer.fail(); @@ -96,10 +103,10 @@ try { writer.close(); } finally { - if (numConsumers.decrementAndGet() == 0) { + if (numConsumers.decrementAndGet() == 0 && out != null) { out.getFileReference().delete(); } } } } -} +} \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java index 7cbe35f..1ee68d9 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java @@ -144,6 +144,11 @@ throw new IllegalStateException("Flush sees an illegal LSM memory compoenent state: " + state); } readerCount--; + if (failedOperation) { + // if flush failed, return the component state to READABLE_UNWRITABLE + state = ComponentState.READABLE_UNWRITABLE; + return; + } if (readerCount == 0) { state = ComponentState.INACTIVE; } else { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index 50eac67..8ff907a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -47,6 +47,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; @@ -131,6 +132,10 @@ // Flush and merge operations should never reach this wait call, because they are always try operations. // If they fail to enter the components, then it means that there are an ongoing flush/merge operation on // the same components, so they should not proceed. + if (opType == LSMOperationType.MODIFICATION) { + // before waiting, make sure the index is in a modifiable state to avoid waiting forever. + ensureIndexModifiable(); + } opTracker.wait(); } catch (InterruptedException e) { throw new HyracksDataException(e); @@ -186,6 +191,7 @@ break; case MERGE: lsmIndex.getIOOperationCallback().beforeOperation(LSMOperationType.MERGE); + break; default: break; } @@ -498,15 +504,17 @@ } ILSMDiskComponent newComponent = null; + boolean failedOperation = false; try { newComponent = lsmIndex.flush(operation); operation.getCallback().afterOperation(LSMOperationType.FLUSH, null, newComponent); lsmIndex.markAsValid(newComponent); } catch (Throwable e) { + failedOperation = true; e.printStackTrace(); throw e; } finally { - exitComponents(ctx, LSMOperationType.FLUSH, newComponent, false); + exitComponents(ctx, LSMOperationType.FLUSH, newComponent, failedOperation); operation.getCallback().afterFinalize(LSMOperationType.FLUSH, newComponent); } if (LOGGER.isLoggable(Level.INFO)) { @@ -545,15 +553,17 @@ } ILSMDiskComponent newComponent = null; + boolean failedOperation = false; try { newComponent = lsmIndex.merge(operation); operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent); lsmIndex.markAsValid(newComponent); } catch (Throwable e) { + failedOperation = true; e.printStackTrace(); throw e; } finally { - exitComponents(ctx, LSMOperationType.MERGE, newComponent, false); + exitComponents(ctx, LSMOperationType.MERGE, newComponent, failedOperation); operation.getCallback().afterFinalize(LSMOperationType.MERGE, newComponent); } if (LOGGER.isLoggable(Level.INFO)) { @@ -660,4 +670,23 @@ exit(ctx); } } + + /*** + * Ensures the index is in a modifiable state + * @throws HyracksDataException if the index is not in a modifiable state + */ + private void ensureIndexModifiable() throws HyracksDataException { + // find if there is any memory component which is in a writable state or eventually will be in a writable state + for (ILSMMemoryComponent memoryComponent : lsmIndex.getMemoryComponents()) { + switch (memoryComponent.getState()) { + case INACTIVE: + case READABLE_WRITABLE: + case READABLE_UNWRITABLE_FLUSHING: + return; + default: + // continue to the next component + } + } + throw HyracksDataException.create(ErrorCode.CANNOT_MODIFY_INDEX_DISK_IS_FULL); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-util/pom.xml b/hyracks-fullstack/hyracks/hyracks-util/pom.xml index 5a68df0..3b03fce 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-util/pom.xml @@ -64,6 +64,10 @@ <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> </dependencies> </project> diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/DiskUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/DiskUtil.java new file mode 100644 index 0000000..9a65d72 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/DiskUtil.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.util; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.commons.lang3.SystemUtils; + +public class DiskUtil { + + private static final Logger LOGGER = Logger.getLogger(DiskUtil.class.getName()); + + private DiskUtil() { + throw new AssertionError("Util class should not be initialized."); + } + + /** + * Mounts a RAM disk + * + * @param name + * @param size + * @param unit + * @return The root of the mounted disk + * @throws IOException + * @throws InterruptedException + */ + public static Path mountRamDisk(String name, int size, StorageUtil.StorageUnit unit) + throws IOException, InterruptedException { + if (SystemUtils.IS_OS_MAC) { + return mountMacRamDisk(name, (StorageUtil.getIntSizeInBytes(size, unit) * 2) / StorageUtil.BASE); + } else if (SystemUtils.IS_OS_LINUX) { + return mountLinuxRamDisk(name, size + unit.getLinuxUnitTypeInLetter()); + } + throw new UnsupportedOperationException("Unsupported OS: " + System.getProperty("os.name")); + } + + /** + * Unmounts a disk + * + * @param name + * @throws IOException + * @throws InterruptedException + */ + public static void unmountRamDisk(String name) throws IOException, InterruptedException { + if (SystemUtils.IS_OS_MAC) { + unmountMacRamDisk(name); + } else if (SystemUtils.IS_OS_LINUX) { + unmountLinuxRamDisk(name); + } + } + + private static Path mountMacRamDisk(String name, long size) throws IOException, InterruptedException { + final String cmd = "diskutil erasevolume HFS+ '" + name + "' `hdiutil attach -nomount ram://" + size + "`"; + final ProcessBuilder pb = new ProcessBuilder("/bin/sh", "-c", cmd); + final Process p = pb.start(); + watchProcess(p); + p.waitFor(); + return Paths.get("/Volumes", name); + } + + private static void unmountMacRamDisk(String name) throws InterruptedException, IOException { + final String cmd = "diskutil unmount " + name; + final ProcessBuilder pb = new ProcessBuilder("/bin/sh", "-c", cmd); + final Process p = pb.start(); + watchProcess(p); + p.waitFor(); + } + + private static Path mountLinuxRamDisk(String name, String size) throws IOException, InterruptedException { + Path root = Paths.get("/tmp", name); + if (!Files.exists(root)) { + Files.createFile(root); + } + final String cmd = "mount -o size=" + size + " -t tmpfs none /tmp/" + name; + final ProcessBuilder pb = new ProcessBuilder("bash", "-c", cmd); + final Process p = pb.start(); + watchProcess(p); + p.waitFor(); + return root; + } + + private static void unmountLinuxRamDisk(String name) throws InterruptedException, IOException { + final String cmd = "umount /tmp/" + name; + final ProcessBuilder pb = new ProcessBuilder("bash", "-c", cmd); + final Process p = pb.start(); + watchProcess(p); + p.waitFor(); + } + + private static void watchProcess(Process p) { + new Thread(() -> { + final BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream())); + String line; + try { + while ((line = input.readLine()) != null) { + LOGGER.info(line); + } + } catch (IOException e) { + LOGGER.log(Level.WARNING, e.getMessage(), e); + } + }).start(); + } +} \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java index 9001e1b..dbfe6f9 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java @@ -23,17 +23,18 @@ public class StorageUtil { - private static final int BASE = 1024; + public static final int BASE = 1024; public enum StorageUnit { - BYTE("B", 1), - KILOBYTE("KB", BASE), - MEGABYTE("MB", KILOBYTE.multiplier * BASE), - GIGABYTE("GB", MEGABYTE.multiplier * BASE), - TERABYTE("TB", GIGABYTE.multiplier * BASE), - PETABYTE("PB", TERABYTE.multiplier * BASE); + BYTE("B", "b", 1), + KILOBYTE("KB", "kb", BASE), + MEGABYTE("MB", "m", KILOBYTE.multiplier * BASE), + GIGABYTE("GB", "g", MEGABYTE.multiplier * BASE), + TERABYTE("TB", "t", GIGABYTE.multiplier * BASE), + PETABYTE("PB", "p", TERABYTE.multiplier * BASE); private final String unitTypeInLetter; + private final String linuxUnitTypeInLetter; private final long multiplier; private static final Map<String, StorageUnit> SUFFIX_TO_UNIT_MAP = new HashMap<>(); @@ -43,8 +44,9 @@ } } - StorageUnit(String unitTypeInLetter, long multiplier) { + StorageUnit(String unitTypeInLetter, String linuxUnitTypeInLetter, long multiplier) { this.unitTypeInLetter = unitTypeInLetter; + this.linuxUnitTypeInLetter = linuxUnitTypeInLetter; this.multiplier = multiplier; } @@ -57,6 +59,10 @@ return value * multiplier; } + public String getLinuxUnitTypeInLetter() { + return linuxUnitTypeInLetter; + } + public static StorageUnit lookupBySuffix(String name) { return SUFFIX_TO_UNIT_MAP.get(name); } -- To view, visit https://asterix-gerrit.ics.uci.edu/1896 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I28592c30c788f4a6f44db8b47a84bc77f6b3f8f3 Gerrit-PatchSet: 7 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Dmitry Lychagin <[email protected]> Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
