Murtadha Hubail has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2181
Change subject: [NO ISSUE][STO] Adapt Structure Structure To Rebalance ...................................................................... [NO ISSUE][STO] Adapt Structure Structure To Rebalance - user model changes: no - storage format changes: no - interface changes: yes -- Added IResource#setPath to use for the resource storage migration. Details: - Unify storage structure to support dataset rebalance: Old format: ./storage/partition_#/dataverse/datasetName_idx_indexName New format: ./storage/partition_#/dataverse/datasetName/rebalanaceNum/indexName - Adapt recovery and replication to new storage structure. - Add old structure -> new structure NC migration task. - Add CompatibilityUtil to ensure NC can be upgraded during NC startup. - Centralize the logic for parsing file path to its components in ResourceReference/DatasetResourceReference. - Add storage structure migration test case. - Add test case for recovery after rebalance. Change-Id: I0f968b9f493bf5aa2d49f503afe21f0d438bb7f0 --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MigrateStorageResourcesTask.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/CompatibilityUtil.java A asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/MigrateStorageResourcesTaskTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java D asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexPathElements.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java M asterixdb/asterix-replication/pom.xml M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java M hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResource.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java M hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResource.java M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResource.java 28 files changed, 882 insertions(+), 422 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/81/2181/1 diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 19966fe..e29e3fe 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -443,7 +443,7 @@ return minFirstLSN; } - private long getRemoteMinFirstLSN() { + private long getRemoteMinFirstLSN() throws HyracksDataException { IReplicaResourcesManager remoteResourcesManager = txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager(); return remoteResourcesManager.getPartitionsMinLSN(localResourceRepository.getInactivePartitions()); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java index f922832..47f9315 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java @@ -81,11 +81,6 @@ checkpointManager = CheckpointManagerFactory.create(this, checkpointProperties, replicationEnabled); final Checkpoint latestCheckpoint = checkpointManager.getLatest(); if (latestCheckpoint != null) { - if (latestCheckpoint.getStorageVersion() != StorageConstants.VERSION) { - throw new IllegalStateException( - String.format("Storage version mismatch. Current version (%s). On disk version: (%s)", - StorageConstants.VERSION, latestCheckpoint.getStorageVersion())); - } transactionManager.ensureMaxTxnId(latestCheckpoint.getMaxTxnId()); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MigrateStorageResourcesTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MigrateStorageResourcesTask.java new file mode 100644 index 0000000..7bfb35c --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MigrateStorageResourcesTask.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.nc.task; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.function.Predicate; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.dataflow.DatasetLocalResource; +import org.apache.asterix.common.storage.DatasetResourceReference; +import org.apache.asterix.common.transactions.Checkpoint; +import org.apache.asterix.common.transactions.ICheckpointManager; +import org.apache.asterix.common.utils.StorageConstants; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.commons.io.FileUtils; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.io.IIOManager; +import org.apache.hyracks.api.io.IODeviceHandle; +import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.storage.common.ILocalResourceRepository; +import org.apache.hyracks.storage.common.LocalResource; + +/** + * Migrates a legacy storage structure to the current one + */ +public class MigrateStorageResourcesTask implements INCLifecycleTask { + + private static final Logger LOGGER = Logger.getLogger(MigrateStorageResourcesTask.class.getName()); + private static final long serialVersionUID = 1L; + + @Override + public void perform(IControllerService cs) throws HyracksDataException { + INcApplicationContext appCtx = (INcApplicationContext) cs.getApplicationContext(); + ICheckpointManager checkpointMgr = appCtx.getTransactionSubsystem().getCheckpointManager(); + final Checkpoint latestCheckpoint = checkpointMgr.getLatest(); + if (latestCheckpoint == null) { + // nothing to migrate + return; + } + final IIOManager ioManager = appCtx.getIoManager(); + final List<IODeviceHandle> ioDevices = ioManager.getIODevices(); + for (IODeviceHandle ioDeviceHandle : ioDevices) { + final Path root = Paths.get(ioDeviceHandle.getMount().getAbsolutePath()); + if (!root.toFile().exists()) { + continue; + } + // all legacy resources are expected to be 5 levels below the storage root + try (Stream<Path> stream = Files.find(root, 5, (path, attr) -> path.getFileName().toString() + .equals(StorageConstants.METADATA_FILE_NAME))) { + final List<Path> resourceToMigrate = stream.map(Path::getParent).collect(Collectors.toList()); + for (Path src : resourceToMigrate) { + final Path dest = + migrateResourceMetadata(root.relativize(src), appCtx, latestCheckpoint.getStorageVersion()); + copyResourceFiles(root.resolve(src), root.resolve(dest), + PersistentLocalResourceRepository.INDEX_COMPONENTS); + FileUtils.deleteDirectory(root.resolve(src).toFile()); + } + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + } + + /** + * Migrates the resource metadata file at {@code resourcePath} to the new storage structure + * and updates the migrated version's metadata to reflect the new path. + * + * @param resourcePath + * @param appCtx + * @param resourceVersion + * @return The migrated resource relative path + * @throws HyracksDataException + */ + private Path migrateResourceMetadata(Path resourcePath, INcApplicationContext appCtx, int resourceVersion) + throws HyracksDataException { + final ILocalResourceRepository localResourceRepository = appCtx.getLocalResourceRepository(); + final LocalResource srcResource = localResourceRepository.get(resourcePath.toFile().getPath()); + final DatasetLocalResource lsmResource = (DatasetLocalResource) srcResource.getResource(); + // recreate the resource with the new path and version + final DatasetResourceReference lrr = DatasetResourceReference.of(srcResource, resourceVersion); + final Path destPath = lrr.getRelativePath(); + final FileReference destDir = appCtx.getIoManager().resolve(destPath.toString()); + // ensure the new dest dir is empty + if (destDir.getFile().exists()) { + FileUtils.deleteQuietly(destDir.getFile()); + } + lsmResource.setPath(destPath.toString()); + + final LocalResource destResource = + new LocalResource(srcResource.getId(), srcResource.getVersion(), srcResource.isDurable(), lsmResource); + LOGGER.info(() -> "Migrating resource from: " + srcResource.getPath() + " to " + destResource.getPath()); + localResourceRepository.insert(destResource); + return destPath; + } + + /** + * Copies the files matching {@code filter} at {@code src} path to {@code dest} + * + * @param src + * @param dest + * @param filter + * @throws IOException + */ + private void copyResourceFiles(Path src, Path dest, Predicate<Path> filter) throws IOException { + try (Stream<Path> stream = Files.list(src)) { + final List<Path> srcFiles = stream.filter(filter).collect(Collectors.toList()); + for (Path srcFile : srcFiles) { + Path fileDest = Paths.get(dest.toString(), srcFile.getFileName().toString()); + Files.copy(srcFile, fileDest); + } + } + } + + @Override + public String toString() { + return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }"; + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index 63f5bfc..47e5ac9 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -37,6 +37,7 @@ import org.apache.asterix.common.config.NodeProperties; import org.apache.asterix.common.config.StorageProperties; import org.apache.asterix.common.config.TransactionProperties; +import org.apache.asterix.common.transactions.Checkpoint; import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; import org.apache.asterix.common.utils.PrintUtil; @@ -46,6 +47,7 @@ import org.apache.asterix.messaging.MessagingChannelInterfaceFactory; import org.apache.asterix.messaging.NCMessageBroker; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.asterix.utils.CompatibilityUtil; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.client.NodeStatus; @@ -54,7 +56,6 @@ import org.apache.hyracks.api.io.IFileDeviceResolver; import org.apache.hyracks.api.job.resource.NodeCapacity; import org.apache.hyracks.api.messages.IMessageBroker; -import org.apache.hyracks.api.util.IoUtil; import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.nc.BaseNCApplication; import org.apache.hyracks.control.nc.NodeControllerService; @@ -115,7 +116,10 @@ MessagingChannelInterfaceFactory interfaceFactory = new MessagingChannelInterfaceFactory((NCMessageBroker) messageBroker, messagingProperties); this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory); - + final Checkpoint latestCheckpoint = runtimeContext.getTransactionSubsystem().getCheckpointManager().getLatest(); + if (latestCheckpoint != null) { + CompatibilityUtil.ensureCompatibility(controllerService, latestCheckpoint.getStorageVersion()); + } IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager(); final SystemState stateOnStartup = recoveryMgr.getSystemState(); if (stateOnStartup == SystemState.PERMANENT_DATA_LOSS) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/CompatibilityUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/CompatibilityUtil.java new file mode 100644 index 0000000..5d44fc9 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/CompatibilityUtil.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.utils; + +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Logger; + +import org.apache.asterix.app.nc.task.MigrateStorageResourcesTask; +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.asterix.common.utils.StorageConstants; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class CompatibilityUtil { + + private static final Logger LOGGER = Logger.getLogger(CompatibilityUtil.class.getName()); + private static final int MIN_COMPATIBLE_VERSION = 1; + + private CompatibilityUtil() { + } + + public static void ensureCompatibility(NodeControllerService ncs, int onDiskVerson) throws HyracksDataException { + if (onDiskVerson == StorageConstants.VERSION) { + return; + } + ensureUpgradability(onDiskVerson); + LOGGER.info(() -> "Upgrading from storage version " + onDiskVerson + " to " + StorageConstants.VERSION); + final List<INCLifecycleTask> upgradeTasks = getUpgradeTasks(onDiskVerson); + for (INCLifecycleTask task : upgradeTasks) { + task.perform(ncs); + } + } + + private static void ensureUpgradability(int onDiskVerson) { + if (onDiskVerson < MIN_COMPATIBLE_VERSION) { + throw new IllegalStateException(String.format( + "Storage cannot be upgraded to new version. Current version (%s). On disk version: (%s)", + StorageConstants.VERSION, onDiskVerson)); + } + } + + private static List<INCLifecycleTask> getUpgradeTasks(int fromVersion) { + List<INCLifecycleTask> upgradeTasks = new ArrayList<>(); + if (fromVersion < StorageConstants.REBALANCE_STORAGE_VERSION) { + upgradeTasks.add(new MigrateStorageResourcesTask()); + } + return upgradeTasks; + } +} \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java new file mode 100644 index 0000000..e24ef2d --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common; + +import java.io.InputStream; +import java.util.Arrays; +import java.util.LinkedHashSet; + +import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil; +import org.apache.asterix.app.active.ActiveNotificationHandler; +import org.apache.asterix.common.api.IMetadataLockManager; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.utils.Servlets; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.rebalance.NoOpDatasetRebalanceCallback; +import org.apache.asterix.test.common.TestExecutor; +import org.apache.asterix.testframework.context.TestCaseContext; +import org.apache.asterix.utils.RebalanceUtil; +import org.junit.Assert; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +public class TestDataUtil { + + private static final TestExecutor TEST_EXECUTOR = new TestExecutor(); + private static final TestCaseContext.OutputFormat OUTPUT_FORMAT = TestCaseContext.OutputFormat.CLEAN_JSON; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private TestDataUtil() { + } + + /** + * Creates dataset with a single field called id as its primary key. + * + * @param dataset + * @throws Exception + */ + public static void createIdOnlyDataset(String dataset) throws Exception { + TEST_EXECUTOR.executeSqlppUpdateOrDdl("CREATE TYPE KeyType IF NOT EXISTS AS { id: int };", OUTPUT_FORMAT); + TEST_EXECUTOR.executeSqlppUpdateOrDdl("CREATE DATASET " + dataset + "(KeyType) PRIMARY KEY id;", OUTPUT_FORMAT); + } + + /** + * Upserts {@code count} ids into {@code dataset} + * + * @param dataset + * @param count + * @throws Exception + */ + public static void upsertData(String dataset, long count) throws Exception { + for (int i = 0; i < count; i++) { + TEST_EXECUTOR.executeSqlppUpdateOrDdl("UPSERT INTO " + dataset + " ({\"id\": " + i + "});", + TestCaseContext.OutputFormat.CLEAN_JSON); + } + } + + /** + * Gets the number of records in dataset {@code dataset} + * + * @param datasetName + * @return The count + * @throws Exception + */ + public static long getDatasetCount(String datasetName) throws Exception { + final String query = "SELECT VALUE COUNT(*) FROM `" + datasetName + "`;"; + final InputStream responseStream = TEST_EXECUTOR + .executeQueryService(query, TEST_EXECUTOR.getEndpoint(Servlets.QUERY_SERVICE), OUTPUT_FORMAT); + final ObjectNode response = OBJECT_MAPPER.readValue(responseStream, ObjectNode.class); + final JsonNode result = response.get("results"); + // make sure there is a single value in result + Assert.assertEquals(1, result.size()); + return result.get(0).asInt(); + } + + /** + * Rebalances a dataset to {@code targetNodes} + * + * @param integrationUtil + * @param dataverseName + * @param datasetName + * @param targetNodes + * @throws Exception + */ + public static void rebalanceDataset(AsterixHyracksIntegrationUtil integrationUtil, String dataverseName, + String datasetName, String[] targetNodes) throws Exception { + ICcApplicationContext ccAppCtx = + (ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext(); + MetadataProvider metadataProvider = new MetadataProvider(ccAppCtx, null); + try { + ActiveNotificationHandler activeNotificationHandler = + (ActiveNotificationHandler) ccAppCtx.getActiveNotificationHandler(); + activeNotificationHandler.suspend(metadataProvider); + try { + IMetadataLockManager lockManager = ccAppCtx.getMetadataLockManager(); + lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(), + dataverseName + '.' + datasetName); + RebalanceUtil.rebalance(dataverseName, datasetName, new LinkedHashSet<>(Arrays.asList(targetNodes)), + metadataProvider, ccAppCtx.getHcc(), NoOpDatasetRebalanceCallback.INSTANCE); + } finally { + activeNotificationHandler.resume(metadataProvider); + } + } finally { + metadataProvider.getLocks().unlock(); + } + } +} diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/MigrateStorageResourcesTaskTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/MigrateStorageResourcesTaskTest.java new file mode 100644 index 0000000..7b86c56 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/MigrateStorageResourcesTaskTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.storage; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.function.Function; +import java.util.logging.ConsoleHandler; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil; +import org.apache.asterix.common.TestDataUtil; +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.common.storage.IndexPathElements; +import org.apache.asterix.common.transactions.Checkpoint; +import org.apache.asterix.common.utils.StorageConstants; +import org.apache.asterix.common.utils.StoragePathUtil; +import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class MigrateStorageResourcesTaskTest { + + private static final String DEFAULT_TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml"; + private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil(); + + @Before + public void setUp() throws Exception { + Logger logger = Logger.getLogger("org.apache"); + logger.setLevel(Level.INFO); + ConsoleHandler handler = new ConsoleHandler(); + handler.setLevel(Level.INFO); + logger.addHandler(handler); + System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, DEFAULT_TEST_CONFIG_FILE_NAME); + } + + @After + public void tearDown() throws Exception { + integrationUtil.deinit(true); + } + + @Test + public void storageStructureMigration() throws Exception { + Function<IndexPathElements, String> legacyIndexPathProvider = (pathElements) -> + (pathElements.getRebalanceCount().equals("0") ? "" : pathElements.getRebalanceCount() + File.separator) + + pathElements.getDatasetName() + StoragePathUtil.DATASET_INDEX_NAME_SEPARATOR + pathElements + .getIndexName(); + StoragePathUtil.indexPathProvider = legacyIndexPathProvider; + integrationUtil.init(true); + // create dataset and insert data using legacy structure + String datasetName = "ds"; + TestDataUtil.createIdOnlyDataset(datasetName); + TestDataUtil.upsertData(datasetName, 100); + final long countBeforeMigration = TestDataUtil.getDatasetCount(datasetName); + // stop NCs + integrationUtil.deinit(false); + // forge a checkpoint with old version to force migration to new storage structure on all ncs + final INcApplicationContext nc1AppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext(); + final AbstractCheckpointManager nc1CheckpointManager = + (AbstractCheckpointManager) nc1AppCtx.getTransactionSubsystem().getCheckpointManager(); + forgeOldVersionCheckpoint(nc1CheckpointManager); + final INcApplicationContext nc2AppCtx = (INcApplicationContext) integrationUtil.ncs[1].getApplicationContext(); + final AbstractCheckpointManager nc2CheckpointManager = + (AbstractCheckpointManager) nc2AppCtx.getTransactionSubsystem().getCheckpointManager(); + forgeOldVersionCheckpoint(nc2CheckpointManager); + + // remove the legacy path provider to use the new default structure + StoragePathUtil.indexPathProvider = null; + // start the NCs to do the migration + integrationUtil.init(false); + final long countAfterMigration = TestDataUtil.getDatasetCount(datasetName); + // ensure data migrated to new structure without issues + Assert.assertEquals(countBeforeMigration, countAfterMigration); + } + + private void forgeOldVersionCheckpoint(AbstractCheckpointManager manger) throws HyracksDataException { + Checkpoint cp = new Checkpoint(-1, -1, 0, System.currentTimeMillis(), true, + StorageConstants.REBALANCE_STORAGE_VERSION - 1); + Path path = manger.getCheckpointPath(cp.getTimeStamp()); + // Write checkpoint file to disk + try (BufferedWriter writer = Files.newBufferedWriter(path)) { + writer.write(cp.asJson()); + writer.flush(); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } +} diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java index 723786c..29efa47 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java @@ -19,38 +19,30 @@ package org.apache.asterix.test.txn; import java.io.File; -import java.io.InputStream; -import java.util.Random; +import java.util.logging.ConsoleHandler; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil; +import org.apache.asterix.common.TestDataUtil; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.configuration.AsterixConfiguration; import org.apache.asterix.common.configuration.Property; -import org.apache.asterix.common.utils.Servlets; -import org.apache.asterix.test.common.TestExecutor; +import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities; import org.apache.asterix.test.common.TestHelper; -import org.apache.asterix.testframework.context.TestCaseContext; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; - public class RecoveryManagerTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final String DEFAULT_TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml"; private static final String TEST_CONFIG_FILE_NAME = "asterix-test-configuration.xml"; private static final String TEST_CONFIG_PATH = System.getProperty("user.dir") + File.separator + "target" + File.separator + "config"; private static final String TEST_CONFIG_FILE_PATH = TEST_CONFIG_PATH + File.separator + TEST_CONFIG_FILE_NAME; - private static final TestExecutor testExecutor = new TestExecutor(); private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil(); - private static final Random random = new Random(); - private static final int numRecords = 1; @Before public void setUp() throws Exception { @@ -74,52 +66,53 @@ @Test public void multiDatasetRecovery() throws Exception { String datasetNamePrefix = "ds_"; - final TestCaseContext.OutputFormat format = TestCaseContext.OutputFormat.CLEAN_JSON; - testExecutor.executeSqlppUpdateOrDdl("CREATE TYPE KeyType AS { id: int };", format); int numDatasets = 50; String datasetName = null; for (int i = 1; i <= numDatasets; i++) { datasetName = datasetNamePrefix + i; - testExecutor.executeSqlppUpdateOrDdl("CREATE DATASET " + datasetName + "(KeyType) PRIMARY KEY id;", format); - insertData(datasetName); + TestDataUtil.createIdOnlyDataset(datasetName); + TestDataUtil.upsertData(datasetName, 10); } + final long countBeforeFirstRecovery = TestDataUtil.getDatasetCount(datasetName); // do ungraceful shutdown to enforce recovery integrationUtil.deinit(false); integrationUtil.init(false); - validateRecovery(datasetName); - + final long countAfterFirstRecovery = TestDataUtil.getDatasetCount(datasetName); + Assert.assertEquals(countBeforeFirstRecovery, countAfterFirstRecovery); // create more datasets after recovery numDatasets = 100; for (int i = 51; i <= numDatasets; i++) { datasetName = datasetNamePrefix + i; - testExecutor.executeSqlppUpdateOrDdl("CREATE DATASET " + datasetName + "(KeyType) PRIMARY KEY id;", format); - insertData(datasetName); + TestDataUtil.createIdOnlyDataset(datasetName); + TestDataUtil.upsertData(datasetName, 1); } + final long countBeforeSecondRecovery = TestDataUtil.getDatasetCount(datasetName); // do ungraceful shutdown to enforce recovery again integrationUtil.deinit(false); integrationUtil.init(false); - validateRecovery(datasetName); + final long countAfterSecondRecovery = TestDataUtil.getDatasetCount(datasetName); + Assert.assertEquals(countBeforeSecondRecovery, countAfterSecondRecovery); } - private void insertData(String datasetName) throws Exception { - for (int i = 0; i < numRecords; i++) { - testExecutor.executeSqlppUpdateOrDdl("UPSERT INTO " + datasetName + " ({\"id\": " + random.nextInt() + "})", - TestCaseContext.OutputFormat.CLEAN_JSON); - } - } - - private void validateRecovery(String datasetName) throws Exception { - final String query = "select value count(*) from `" + datasetName + "`;"; - final InputStream inputStream = testExecutor - .executeQueryService(query, testExecutor.getEndpoint(Servlets.QUERY_SERVICE), - TestCaseContext.OutputFormat.CLEAN_JSON); - final ObjectNode jsonNodes = OBJECT_MAPPER.readValue(inputStream, ObjectNode.class); - JsonNode result = jsonNodes.get("results"); - // make sure there is result - Assert.assertEquals(1, result.size()); - for (int i = 0; i < result.size(); i++) { - JsonNode json = result.get(i); - Assert.assertEquals(numRecords, json.asInt()); - } + @Test + public void reoveryAfterRebalance() throws Exception { + String datasetName = "ds"; + TestDataUtil.createIdOnlyDataset(datasetName); + TestDataUtil.upsertData(datasetName, 10); + final long countBeforeRebalance = TestDataUtil.getDatasetCount(datasetName); + // rebalance dataset to single nc + TestDataUtil.rebalanceDataset(integrationUtil, MetadataBuiltinEntities.DEFAULT_DATAVERSE.getDataverseName(), + datasetName, new String[] { "asterix_nc2" }); + // check data after rebalance + final long countAfterRebalance = TestDataUtil.getDatasetCount(datasetName); + Assert.assertEquals(countBeforeRebalance, countAfterRebalance); + // insert data after rebalance + TestDataUtil.upsertData(datasetName, 20); + final long countBeforeRecovery = TestDataUtil.getDatasetCount(datasetName); + // do ungraceful shutdown to enforce recovery + integrationUtil.deinit(false); + integrationUtil.init(false); + final long countAfterRecovery = TestDataUtil.getDatasetCount(datasetName); + Assert.assertEquals(countBeforeRecovery, countAfterRecovery); } } \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java index 78b5cb2..48bbf00 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java @@ -59,6 +59,11 @@ } @Override + public void setPath(String path) { + resource.setPath(path); + } + + @Override public IIndex createInstance(INCServiceContext ncServiceCtx) throws HyracksDataException { return resource.createInstance(ncServiceCtx); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java index 6ffa095..492a393 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java @@ -20,11 +20,13 @@ import java.util.Set; +import org.apache.hyracks.api.exceptions.HyracksDataException; + public interface IReplicaResourcesManager { /** * @param partitions * @return the minimum LSN of all indexes that belong to {@code partitions}. */ - public long getPartitionsMinLSN(Set<Integer> partitions); + long getPartitionsMinLSN(Set<Integer> partitions) throws HyracksDataException; } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java new file mode 100644 index 0000000..d05321e --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.storage; + +import java.nio.file.Paths; + +import org.apache.asterix.common.dataflow.DatasetLocalResource; +import org.apache.asterix.common.utils.StorageConstants; +import org.apache.hyracks.storage.common.LocalResource; + +public class DatasetResourceReference extends ResourceReference { + + private int datasetId; + private int partitionId; + + private DatasetResourceReference() { + super(); + } + + public static DatasetResourceReference of(LocalResource localResource) { + return of(localResource, StorageConstants.VERSION); + } + + public static DatasetResourceReference of(LocalResource localResource, int version) { + if (version < StorageConstants.REBALANCE_STORAGE_VERSION) { + // to support legacy storage migration + return parseLegacyPath(localResource); + } + return parse(localResource); + } + + public int getDatasetId() { + return datasetId; + } + + public int getPartitionId() { + return partitionId; + } + + private static DatasetResourceReference parse(LocalResource localResource) { + final DatasetResourceReference datasetResourceReference = new DatasetResourceReference(); + final String filePath = Paths.get(localResource.getPath(), StorageConstants.METADATA_FILE_NAME).toString(); + parse(datasetResourceReference, filePath); + assignIds(localResource, datasetResourceReference); + return datasetResourceReference; + } + + private static DatasetResourceReference parseLegacyPath(LocalResource localResource) { + final DatasetResourceReference datasetResourceReference = new DatasetResourceReference(); + final String filePath = Paths.get(localResource.getPath(), StorageConstants.METADATA_FILE_NAME).toString(); + parseLegacyPath(datasetResourceReference, filePath); + assignIds(localResource, datasetResourceReference); + return datasetResourceReference; + } + + private static void assignIds(LocalResource localResource, DatasetResourceReference lrr) { + final DatasetLocalResource dsResource = (DatasetLocalResource) localResource.getResource(); + lrr.datasetId = dsResource.getDatasetId(); + lrr.partitionId = dsResource.getPartition(); + } +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java deleted file mode 100644 index ca6968f..0000000 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.common.storage; - -import java.io.File; - -import org.apache.asterix.common.utils.StoragePathUtil; - -/** - * A holder class for an index file properties. - */ -public class IndexFileProperties { - - private final String fileName; - private final String idxName; - private final String dataverseName; - private final int partitionId; - private final int datasetId; - - public IndexFileProperties(int partitionId, String dataverseName, String idxName, String fileName, int datasetId) { - this.partitionId = partitionId; - this.dataverseName = dataverseName; - this.idxName = idxName; - this.fileName = fileName; - this.datasetId = datasetId; - } - - public String getFileName() { - return fileName; - } - - public String getIdxName() { - return idxName; - } - - public String getDataverseName() { - return dataverseName; - } - - public int getPartitionId() { - return partitionId; - } - - public int getDatasetId() { - return datasetId; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(StoragePathUtil.PARTITION_DIR_PREFIX + partitionId + File.separator); - sb.append(dataverseName + File.separator); - sb.append(idxName + File.separator); - sb.append(fileName); - sb.append(" [Dataset ID: " + datasetId + "]"); - return sb.toString(); - } -} \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexPathElements.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexPathElements.java new file mode 100644 index 0000000..4d0f3dd --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexPathElements.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.storage; + +public class IndexPathElements { + + private final String datasetName; + private final String indexName; + private final String rebalanceCount; + + public IndexPathElements(String datasetName, String indexName, String rebalanceCount) { + this.datasetName = datasetName; + this.indexName = indexName; + this.rebalanceCount = rebalanceCount; + } + + public String getDatasetName() { + return datasetName; + } + + public String getIndexName() { + return indexName; + } + + public String getRebalanceCount() { + return rebalanceCount; + } +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java new file mode 100644 index 0000000..0d65067 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.storage; + +import java.io.File; +import java.nio.file.Path; +import java.nio.file.Paths; + +import org.apache.asterix.common.utils.StoragePathUtil; + +public class ResourceReference { + + protected String root; + protected String partition; + protected String dataverse; + protected String dataset; + protected String rebalance; + protected String index; + protected String name; + + protected ResourceReference() { + } + + public static ResourceReference of(String localResourcePath) { + ResourceReference lrr = new ResourceReference(); + parse(lrr, localResourcePath); + return lrr; + } + + public String getPartition() { + return partition; + } + + public String getDataverse() { + return dataverse; + } + + public String getDataset() { + return dataset; + } + + public String getRebalance() { + return rebalance; + } + + public String getIndex() { + return index; + } + + public String getName() { + return name; + } + + public Path getRelativePath() { + return Paths.get(root, partition, dataverse, dataset, rebalance, index); + } + + protected static void parse(ResourceReference ref, String path) { + // format: root/partition/dataverse/dataset/rebalanceCount/index/fileName + final String[] tokens = path.split(File.separator); + if (tokens.length < 6) { + throw new IllegalStateException("Unrecognized path structure: " + path); + } + int offset = tokens.length; + ref.name = tokens[--offset]; + ref.index = tokens[--offset]; + ref.rebalance = tokens[--offset]; + ref.dataset = tokens[--offset]; + ref.dataverse = tokens[--offset]; + ref.partition = tokens[--offset]; + ref.root = tokens[--offset]; + } + + protected static void parseLegacyPath(ResourceReference ref, String path) { + // old format: root/partition/dataverse/datasetName_idx_IndexName/fileName + final String[] tokens = path.split(File.separator); + if (tokens.length < 4) { + throw new IllegalStateException("Unrecognized legacy path structure: " + path); + } + int offset = tokens.length; + ref.name = tokens[--offset]; + // split combined dataset/index name + final String[] indexTokens = tokens[--offset].split(StoragePathUtil.DATASET_INDEX_NAME_SEPARATOR); + if (indexTokens.length != 2) { + throw new IllegalStateException("Unrecognized legacy path structure: " + path); + } + ref.dataset = indexTokens[0]; + ref.index = indexTokens[1]; + ref.dataverse = tokens[--offset]; + ref.partition = tokens[--offset]; + ref.root = tokens[--offset]; + ref.rebalance = String.valueOf(0); + } +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java index 49d64d6..48769d4 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java @@ -24,13 +24,25 @@ * A static class that stores storage constants */ public class StorageConstants { - public static final String METADATA_ROOT = "root_metadata"; - /** The storage version of AsterixDB related artifacts (e.g. log files, checkpoint files, etc..). */ - private static final int LOCAL_STORAGE_VERSION = 1; - /** The storage version of AsterixDB stack. */ + public static final String METADATA_ROOT = "root_metadata"; + public static final String METADATA_FILE_NAME = ".metadata"; + + /** + * The storage version of AsterixDB related artifacts (e.g. log files, checkpoint files, etc..). + */ + private static final int LOCAL_STORAGE_VERSION = 2; + + /** + * The storage version of AsterixDB stack. + */ public static final int VERSION = LOCAL_STORAGE_VERSION + ITreeIndexFrame.Constants.VERSION; + /** + * The storage version in which the rebalance storage structure was introduced + */ + public static final int REBALANCE_STORAGE_VERSION = 8; + private StorageConstants() { } } \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java index 027f72c..07b9359 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java @@ -19,8 +19,12 @@ package org.apache.asterix.common.utils; import java.io.File; +import java.nio.file.Paths; +import java.util.function.Function; import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.storage.IndexPathElements; +import org.apache.asterix.common.storage.ResourceReference; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.utils.Pair; @@ -36,6 +40,7 @@ private static final Logger LOGGER = Logger.getLogger(StoragePathUtil.class.getName()); public static final String PARTITION_DIR_PREFIX = "partition_"; public static final String DATASET_INDEX_NAME_SEPARATOR = "_idx_"; + public static Function<IndexPathElements, String> indexPathProvider; private StoragePathUtil() { } @@ -69,8 +74,10 @@ } private static String prepareFullIndexName(String datasetName, String idxName, long rebalanceCount) { - return (rebalanceCount == 0 ? "" : rebalanceCount + File.separator) + datasetName + DATASET_INDEX_NAME_SEPARATOR - + idxName; + if (indexPathProvider != null) { + return indexPathProvider.apply(new IndexPathElements(datasetName, idxName, String.valueOf(rebalanceCount))); + } + return datasetName + File.separator + rebalanceCount + File.separator + idxName; } public static int getPartitionNumFromName(String name) { @@ -88,10 +95,7 @@ * @return the file relative path starting from the partition directory */ public static String getIndexFileRelativePath(String fileAbsolutePath) { - String[] tokens = fileAbsolutePath.split(File.separator); - //partition/dataverse/idx/fileName - return tokens[tokens.length - 4] + File.separator + tokens[tokens.length - 3] + File.separator - + tokens[tokens.length - 2] + File.separator + tokens[tokens.length - 1]; + return ResourceReference.of(fileAbsolutePath).getRelativePath().toString(); } /** @@ -136,7 +140,6 @@ * @return The index name */ public static String getIndexNameFromPath(String path) { - int idx = path.lastIndexOf(DATASET_INDEX_NAME_SEPARATOR); - return idx != -1 ? path.substring(idx + DATASET_INDEX_NAME_SEPARATOR.length()) : path; + return Paths.get(path).getFileName().toString(); } } diff --git a/asterixdb/asterix-replication/pom.xml b/asterixdb/asterix-replication/pom.xml index 3138806..f209aae 100644 --- a/asterixdb/asterix-replication/pom.xml +++ b/asterixdb/asterix-replication/pom.xml @@ -43,11 +43,6 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>org.apache.asterix</groupId> - <artifactId>asterix-metadata</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-api</artifactId> </dependency> diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java index 9d8c351..3143284 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java @@ -54,7 +54,7 @@ import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.replication.IReplicationThread; import org.apache.asterix.common.replication.ReplicaEvent; -import org.apache.asterix.common.storage.IndexFileProperties; +import org.apache.asterix.common.storage.DatasetResourceReference; import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.LogRecord; @@ -392,7 +392,7 @@ //start sending files for (String filePath : filesList) { // Send only files of datasets that are replciated. - IndexFileProperties indexFileRef = localResourceRep.getIndexFileRef(filePath); + DatasetResourceReference indexFileRef = localResourceRep.getLocalResourceReference(filePath); if (!repStrategy.isMatch(indexFileRef.getDatasetId())) { continue; } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java index b0aa0fb..48c7083 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java @@ -62,7 +62,7 @@ import org.apache.asterix.common.replication.Replica.ReplicaState; import org.apache.asterix.common.replication.ReplicaEvent; import org.apache.asterix.common.replication.ReplicationJob; -import org.apache.asterix.common.storage.IndexFileProperties; +import org.apache.asterix.common.storage.DatasetResourceReference; import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.ILogRecord; @@ -280,7 +280,7 @@ //all of the job's files belong to a single storage partition. //get any of them to determine the partition from the file path. String jobFile = job.getJobFiles().iterator().next(); - IndexFileProperties indexFileRef = localResourceRepo.getIndexFileRef(jobFile); + DatasetResourceReference indexFileRef = localResourceRepo.getLocalResourceReference(jobFile); if (!replicationStrategy.isMatch(indexFileRef.getDatasetId())) { return; } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java index a8b15d2..7ca6f2f 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java @@ -23,9 +23,11 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; +import java.nio.file.Paths; import java.util.concurrent.atomic.AtomicInteger; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; +import org.apache.asterix.common.storage.ResourceReference; import org.apache.asterix.replication.logging.TxnLogUtil; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; @@ -93,18 +95,16 @@ return lsmCompProp; } - public String getMaskPath(ReplicaResourcesManager resourceManager) { + public String getMaskPath(ReplicaResourcesManager resourceManager) throws HyracksDataException { if (maskPath == null) { LSMIndexFileProperties afp = new LSMIndexFileProperties(this); - //split the index file path to get the LSM component file name - afp.splitFileName(); maskPath = getReplicaComponentPath(resourceManager) + File.separator + afp.getFileName() + ReplicaResourcesManager.LSM_COMPONENT_MASK_SUFFIX; } return maskPath; } - public String getReplicaComponentPath(ReplicaResourcesManager resourceManager) { + public String getReplicaComponentPath(ReplicaResourcesManager resourceManager) throws HyracksDataException { if (replicaPath == null) { LSMIndexFileProperties afp = new LSMIndexFileProperties(this); replicaPath = resourceManager.getIndexPath(afp); @@ -118,23 +118,10 @@ * @return a unique id based on the timestamp of the component */ public static String getLSMComponentID(String filePath) { - String[] tokens = filePath.split(File.separator); - - int arraySize = tokens.length; - String fileName = tokens[arraySize - 1]; - String idxName = tokens[arraySize - 2]; - String dataverse = tokens[arraySize - 3]; - String partitionName = tokens[arraySize - 4]; - - StringBuilder componentId = new StringBuilder(); - componentId.append(partitionName); - componentId.append(File.separator); - componentId.append(dataverse); - componentId.append(File.separator); - componentId.append(idxName); - componentId.append(File.separator); - componentId.append(fileName.substring(0, fileName.lastIndexOf(AbstractLSMIndexFileManager.DELIMITER))); - return componentId.toString(); + final ResourceReference ref = ResourceReference.of(filePath); + final String fileUniqueTimestamp = + ref.getName().substring(0, ref.getName().lastIndexOf(AbstractLSMIndexFileManager.DELIMITER)); + return Paths.get(ref.getRelativePath().toString(), fileUniqueTimestamp).toString(); } public String getComponentId() { @@ -149,16 +136,8 @@ return nodeId; } - public int getNumberOfFiles() { - return numberOfFiles.get(); - } - public int markFileComplete() { return numberOfFiles.decrementAndGet(); - } - - public void setNumberOfFiles(AtomicInteger numberOfFiles) { - this.numberOfFiles = numberOfFiles; } public Long getReplicaLSN() { @@ -171,10 +150,6 @@ public LSMOperationType getOpType() { return opType; - } - - public void setOpType(LSMOperationType opType) { - this.opType = opType; } public String getNodeUniqueLSN() { diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java index eb9e82d..f2747fe 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java @@ -20,24 +20,18 @@ import java.io.DataInput; import java.io.DataOutputStream; -import java.io.File; import java.io.IOException; import java.io.OutputStream; - -import org.apache.asterix.common.utils.StoragePathUtil; +import java.nio.file.Paths; public class LSMIndexFileProperties { - private String fileName; private long fileSize; private String nodeId; - private String dataverse; - private String idxName; private boolean lsmComponentFile; private String filePath; private boolean requiresAck = false; private long LSNByteOffset; - private int partition; public LSMIndexFileProperties() { } @@ -59,15 +53,6 @@ this.lsmComponentFile = lsmComponentFile; this.LSNByteOffset = LSNByteOffset; this.requiresAck = requiresAck; - } - - public void splitFileName() { - String[] tokens = filePath.split(File.separator); - int arraySize = tokens.length; - this.fileName = tokens[arraySize - 1]; - this.idxName = tokens[arraySize - 2]; - this.dataverse = tokens[arraySize - 3]; - this.partition = StoragePathUtil.getPartitionNumFromName(tokens[arraySize - 4]); } public void serialize(OutputStream out) throws IOException { @@ -100,24 +85,8 @@ return fileSize; } - public String getFileName() { - return fileName; - } - public String getNodeId() { return nodeId; - } - - public String getDataverse() { - return dataverse; - } - - public void setDataverse(String dataverse) { - this.dataverse = dataverse; - } - - public String getIdxName() { - return idxName; } public boolean isLSMComponentFile() { @@ -128,25 +97,22 @@ return requiresAck; } + public String getFileName() { + return Paths.get(filePath).toFile().getName(); + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("File Name: " + fileName + " "); + sb.append("File Path: " + filePath + " "); sb.append("File Size: " + fileSize + " "); sb.append("Node ID: " + nodeId + " "); - sb.append("Partition: " + partition + " "); - sb.append("IDX Name: " + idxName + " "); sb.append("isLSMComponentFile : " + lsmComponentFile + " "); - sb.append("Dataverse: " + dataverse); sb.append("LSN Byte Offset: " + LSNByteOffset); return sb.toString(); } public long getLSNByteOffset() { return LSNByteOffset; - } - - public int getPartition() { - return partition; } } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java index cf8e001..7eea4a4 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java @@ -38,14 +38,14 @@ import java.util.logging.Logger; import org.apache.asterix.common.cluster.ClusterPartition; -import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.MetadataProperties; import org.apache.asterix.common.replication.IReplicaResourcesManager; +import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.common.utils.StoragePathUtil; -import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.common.ILocalResourceRepository; import org.apache.hyracks.storage.common.LocalResource; @@ -63,7 +63,7 @@ nodePartitions = metadataProperties.getNodePartitions(); } - public void deleteIndexFile(LSMIndexFileProperties afp) { + public void deleteIndexFile(LSMIndexFileProperties afp) throws HyracksDataException { String indexPath = getIndexPath(afp); if (indexPath != null) { if (afp.isLSMComponentFile()) { @@ -78,20 +78,12 @@ } } - public String getIndexPath(LSMIndexFileProperties fileProperties) { - fileProperties.splitFileName(); - //get partition path in this node - String partitionPath = localRepository.getPartitionPath(fileProperties.getPartition()); - //get index path - String indexPath = SplitsAndConstraintsUtil.getIndexPath(partitionPath, fileProperties.getPartition(), - fileProperties.getDataverse(), fileProperties.getIdxName()); - - Path path = Paths.get(indexPath); - if (!Files.exists(path)) { - File indexFolder = new File(indexPath); - indexFolder.mkdirs(); + public String getIndexPath(LSMIndexFileProperties fileProperties) throws HyracksDataException { + final FileReference indexPath = localRepository.getIndexPath(Paths.get(fileProperties.getFilePath())); + if (!indexPath.getFile().exists()) { + indexPath.getFile().mkdirs(); } - return indexPath; + return indexPath.toString(); } public void initializeReplicaIndexLSNMap(String indexPath, long currentLSN) throws IOException { @@ -123,21 +115,21 @@ updateReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this), lsnMap); } - public Set<File> getReplicaIndexes(String replicaId) { + public Set<File> getReplicaIndexes(String replicaId) throws HyracksDataException { Set<File> remoteIndexesPaths = new HashSet<File>(); ClusterPartition[] partitions = nodePartitions.get(replicaId); for (ClusterPartition partition : partitions) { - remoteIndexesPaths.addAll(getPartitionIndexes(partition.getPartitionId())); + remoteIndexesPaths.addAll(localRepository.getPartitionIndexes(partition.getPartitionId())); } return remoteIndexesPaths; } @Override - public long getPartitionsMinLSN(Set<Integer> partitions) { + public long getPartitionsMinLSN(Set<Integer> partitions) throws HyracksDataException { long minRemoteLSN = Long.MAX_VALUE; for (Integer partition : partitions) { //for every index in replica - Set<File> remoteIndexes = getPartitionIndexes(partition); + Set<File> remoteIndexes = localRepository.getPartitionIndexes(partition); for (File indexFolder : remoteIndexes) { //read LSN map try { @@ -164,7 +156,7 @@ for (File indexFolder : remoteIndexes) { if (getReplicaIndexMaxLSN(indexFolder) < targetLSN) { File localResource = new File( - indexFolder + File.separator + PersistentLocalResourceRepository.METADATA_FILE_NAME); + indexFolder + File.separator + StorageConstants.METADATA_FILE_NAME); LocalResource resource = PersistentLocalResourceRepository.readLocalResource(localResource); laggingReplicaIndexes.put(resource.getId(), indexFolder.getAbsolutePath()); } @@ -190,7 +182,12 @@ public void cleanInvalidLSMComponents(String replicaId) { //for every index in replica - Set<File> remoteIndexes = getReplicaIndexes(replicaId); + Set<File> remoteIndexes = null; + try { + remoteIndexes = getReplicaIndexes(replicaId); + } catch (HyracksDataException e) { + throw new IllegalStateException(e); + } for (File remoteIndexFile : remoteIndexes) { //search for any mask File[] masks = remoteIndexFile.listFiles(LSM_COMPONENTS_MASKS_FILTER); @@ -241,41 +238,11 @@ /** * @param partition - * @return Set of file references to each index in the partition - */ - public Set<File> getPartitionIndexes(int partition) { - Set<File> partitionIndexes = new HashSet<File>(); - String storageDirName = ClusterProperties.INSTANCE.getStorageDirectoryName(); - String partitionStoragePath = localRepository.getPartitionPath(partition) - + StoragePathUtil.prepareStoragePartitionPath(storageDirName, partition); - File partitionRoot = new File(partitionStoragePath); - if (partitionRoot.exists() && partitionRoot.isDirectory()) { - File[] dataverseFileList = partitionRoot.listFiles(); - if (dataverseFileList != null) { - for (File dataverseFile : dataverseFileList) { - if (dataverseFile.isDirectory()) { - File[] indexFileList = dataverseFile.listFiles(); - if (indexFileList != null) { - for (File indexFile : indexFileList) { - if (indexFile.isDirectory()) { - partitionIndexes.add(indexFile); - } - } - } - } - } - } - } - return partitionIndexes; - } - - /** - * @param partition * @return Absolute paths to all partition files */ - public List<String> getPartitionIndexesFiles(int partition, boolean relativePath) { + public List<String> getPartitionIndexesFiles(int partition, boolean relativePath) throws HyracksDataException { List<String> partitionFiles = new ArrayList<String>(); - Set<File> partitionIndexes = getPartitionIndexes(partition); + Set<File> partitionIndexes = localRepository.getPartitionIndexes(partition); for (File indexDir : partitionIndexes) { if (indexDir.isDirectory()) { File[] indexFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER); @@ -284,8 +251,7 @@ if (!relativePath) { partitionFiles.add(file.getAbsolutePath()); } else { - partitionFiles.add( - StoragePathUtil.getIndexFileRelativePath(file.getAbsolutePath())); + partitionFiles.add(StoragePathUtil.getIndexFileRelativePath(file.getAbsolutePath())); } } } @@ -311,7 +277,7 @@ private static final FilenameFilter LSM_INDEX_FILES_FILTER = new FilenameFilter() { @Override public boolean accept(File dir, String name) { - return name.equalsIgnoreCase(PersistentLocalResourceRepository.METADATA_FILE_NAME) || !name.startsWith("."); + return name.equalsIgnoreCase(StorageConstants.METADATA_FILE_NAME) || !name.startsWith("."); } }; } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index db3647e..587e8c1 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -23,22 +23,26 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; -import java.io.FilenameFilter; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Path; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.MetadataProperties; @@ -47,7 +51,8 @@ import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.replication.IReplicationManager; import org.apache.asterix.common.replication.ReplicationJob; -import org.apache.asterix.common.storage.IndexFileProperties; +import org.apache.asterix.common.storage.DatasetResourceReference; +import org.apache.asterix.common.storage.ResourceReference; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.commons.io.FileUtils; @@ -68,15 +73,13 @@ public class PersistentLocalResourceRepository implements ILocalResourceRepository { - // Public constants - public static final String METADATA_FILE_NAME = ".metadata"; + public static final Predicate<Path> INDEX_COMPONENTS = path -> !path.endsWith(StorageConstants.METADATA_FILE_NAME); // Private constants private static final Logger LOGGER = Logger.getLogger(PersistentLocalResourceRepository.class.getName()); private static final String STORAGE_METADATA_DIRECTORY = StorageConstants.METADATA_ROOT; private static final String STORAGE_METADATA_FILE_NAME_PREFIX = "." + StorageConstants.METADATA_ROOT; private static final int MAX_CACHED_RESOURCES = 1000; - private static final FilenameFilter METADATA_FILES_FILTER = - (File dir, String name) -> name.equalsIgnoreCase(METADATA_FILE_NAME); + // Finals private final IIOManager ioManager; private final String[] mountPoints; @@ -157,8 +160,9 @@ //make dirs for the storage metadata file boolean success = storageMetadataDir.mkdirs(); if (!success) { - throw HyracksDataException.create(ErrorCode.ROOT_LOCAL_RESOURCE_COULD_NOT_BE_CREATED, - getClass().getSimpleName(), storageMetadataDir.getAbsolutePath()); + throw HyracksDataException + .create(ErrorCode.ROOT_LOCAL_RESOURCE_COULD_NOT_BE_CREATED, getClass().getSimpleName(), + storageMetadataDir.getAbsolutePath()); } LOGGER.log(Level.INFO, "created the root-metadata-file's directory: " + storageMetadataDir.getAbsolutePath()); @@ -198,8 +202,8 @@ throw HyracksDataException.create(CANNOT_CREATE_FILE, parent.getAbsolutePath()); } - try (FileOutputStream fos = new FileOutputStream(resourceFile.getFile()); - ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) { + try (FileOutputStream fos = new FileOutputStream( + resourceFile.getFile()); ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) { oosToFos.writeObject(resource); oosToFos.flush(); } catch (IOException e) { @@ -226,27 +230,23 @@ } finally { // Regardless of successfully deleted or not, the operation should be replicated. //if replication enabled, delete resource from remote replicas - if (isReplicationEnabled - && !resourceFile.getFile().getName().startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) { + if (isReplicationEnabled && !resourceFile.getFile().getName() + .startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) { createReplicationJob(ReplicationOperation.DELETE, resourceFile); } } } else { - throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST, - relativePath); + throw HyracksDataException + .create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST, relativePath); } } private static FileReference getLocalResourceFileByName(IIOManager ioManager, String resourcePath) throws HyracksDataException { - String fileName = resourcePath + File.separator + METADATA_FILE_NAME; + String fileName = resourcePath + File.separator + StorageConstants.METADATA_FILE_NAME; return ioManager.resolve(fileName); } - - public Map<Long, LocalResource> loadAndGetAllResources() throws IOException { - //TODO During recovery, the memory usage currently is proportional to the number of resources available. - //This could be fixed by traversing all resources on disk until the required resource is found. - LOGGER.log(Level.INFO, "Loading all resources"); + public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter) throws HyracksDataException { Map<Long, LocalResource> resourcesMap = new HashMap<>(); for (int i = 0; i < mountPoints.length; i++) { File storageRootDir = getStorageRootDirectoryIfExists(ioManager, nodeId, i); @@ -254,109 +254,47 @@ LOGGER.log(Level.INFO, "Getting storage root dir returned null. Returning"); continue; } + // storage/partition/dataverse/dataset/rebalance/idx LOGGER.log(Level.INFO, "Getting storage root dir returned " + storageRootDir.getAbsolutePath()); - //load all local resources. - File[] partitions = storageRootDir.listFiles(); - LOGGER.log(Level.INFO, "Number of partitions found = " + partitions.length); - for (File partition : partitions) { - File[] dataverseFileList = partition.listFiles(); - LOGGER.log(Level.INFO, "Reading partition = " + partition.getName() + ". Number of dataverses found: " - + dataverseFileList.length); - if (dataverseFileList != null) { - for (File dataverseFile : dataverseFileList) { - loadDataverse(dataverseFile, resourcesMap); + try { + try (Stream<Path> stream = Files.find(storageRootDir.toPath(), 6, + (path, attr) -> path.getFileName().toString().equals(StorageConstants.METADATA_FILE_NAME))) { + final List<File> resourceMetadataFiles = stream.map(Path::toFile).collect(Collectors.toList()); + for (File file : resourceMetadataFiles) { + final LocalResource localResource = PersistentLocalResourceRepository.readLocalResource(file); + if (filter.test(localResource)) { + resourcesMap.put(localResource.getId(), localResource); + } } } + } catch (IOException e) { + throw HyracksDataException.create(e); } } return resourcesMap; + } - private void loadDataverse(File dataverseFile, Map<Long, LocalResource> resourcesMap) throws HyracksDataException { - LOGGER.log(Level.INFO, "Loading dataverse:" + dataverseFile.getName()); - if (dataverseFile.isDirectory()) { - File[] indexFileList = dataverseFile.listFiles(); - if (indexFileList != null) { - for (File indexFile : indexFileList) { - loadIndex(indexFile, resourcesMap); - } - } - } - } - - private void loadIndex(File indexFile, Map<Long, LocalResource> resourcesMap) throws HyracksDataException { - LOGGER.log(Level.INFO, "Loading index:" + indexFile.getName()); - if (indexFile.isDirectory()) { - File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER); - if (metadataFiles != null) { - for (File metadataFile : metadataFiles) { - LocalResource localResource = readLocalResource(metadataFile); - LOGGER.log(Level.INFO, "Resource loaded " + localResource.getId() + ":" + localResource.getPath()); - resourcesMap.put(localResource.getId(), localResource); - } - } - } + public Map<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException { + return getResources(p -> true); } @Override public long maxId() throws HyracksDataException { - long maxResourceId = 0; - for (int i = 0; i < mountPoints.length; i++) { - File storageRootDir = getStorageRootDirectoryIfExists(ioManager, nodeId, i); - if (storageRootDir == null) { - continue; - } - - //load all local resources. - File[] partitions = storageRootDir.listFiles(); - for (File partition : partitions) { - //traverse all local resources. - File[] dataverseFileList = partition.listFiles(); - if (dataverseFileList != null) { - for (File dataverseFile : dataverseFileList) { - maxResourceId = getMaxResourceIdForDataverse(dataverseFile, maxResourceId); - } - } - } - } - return maxResourceId; - } - - private long getMaxResourceIdForDataverse(File dataverseFile, long maxSoFar) throws HyracksDataException { - long maxResourceId = maxSoFar; - if (dataverseFile.isDirectory()) { - File[] indexFileList = dataverseFile.listFiles(); - if (indexFileList != null) { - for (File indexFile : indexFileList) { - maxResourceId = getMaxResourceIdForIndex(indexFile, maxResourceId); - } - } - } - return maxResourceId; - } - - private long getMaxResourceIdForIndex(File indexFile, long maxSoFar) throws HyracksDataException { - long maxResourceId = maxSoFar; - if (indexFile.isDirectory()) { - File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER); - if (metadataFiles != null) { - for (File metadataFile : metadataFiles) { - LocalResource localResource = readLocalResource(metadataFile); - maxResourceId = Math.max(maxResourceId, localResource.getId()); - } - } - } - return maxResourceId; + final Map<Long, LocalResource> allResources = loadAndGetAllResources(); + final Optional<Long> max = allResources.keySet().stream().max(Long::compare); + return max.isPresent() ? max.get() : 0; } private static String getFileName(String path) { - return path.endsWith(File.separator) ? (path + METADATA_FILE_NAME) - : (path + File.separator + METADATA_FILE_NAME); + return path.endsWith(File.separator) ? + (path + StorageConstants.METADATA_FILE_NAME) : + (path + File.separator + StorageConstants.METADATA_FILE_NAME); } public static LocalResource readLocalResource(File file) throws HyracksDataException { - try (FileInputStream fis = new FileInputStream(file); - ObjectInputStream oisFromFis = new ObjectInputStream(fis)) { + try (FileInputStream fis = new FileInputStream(file); ObjectInputStream oisFromFis = new ObjectInputStream( + fis)) { LocalResource resource = (LocalResource) oisFromFis.readObject(); if (resource.getVersion() == ITreeIndexFrame.Constants.VERSION) { return resource; @@ -425,8 +363,9 @@ * @return A file reference to the storage metadata file. */ private static FileReference getStorageMetadataFile(IIOManager ioManager, String nodeId, int ioDeviceId) { - String storageMetadataFileName = STORAGE_METADATA_DIRECTORY + File.separator + nodeId + "_" + "iodevice" - + ioDeviceId + File.separator + STORAGE_METADATA_FILE_NAME_PREFIX; + String storageMetadataFileName = + STORAGE_METADATA_DIRECTORY + File.separator + nodeId + "_" + "iodevice" + ioDeviceId + File.separator + + STORAGE_METADATA_FILE_NAME_PREFIX; return new FileReference(ioManager.getIODevices().get(ioDeviceId), storageMetadataFileName); } @@ -483,10 +422,6 @@ return Collections.unmodifiableSet(nodeInactivePartitions); } - public Set<Integer> getNodeOrignalPartitions() { - return Collections.unmodifiableSet(nodeOriginalPartitions); - } - public synchronized void addActivePartition(int partitonId) { nodeActivePartitions.add(partitonId); nodeInactivePartitions.remove(partitonId); @@ -497,27 +432,27 @@ nodeActivePartitions.remove(partitonId); } - private static String getLocalResourceRelativePath(String absolutePath) { - final String[] tokens = absolutePath.split(File.separator); - // Format: storage_dir/partition/dataverse/idx - return tokens[tokens.length - 5] + File.separator + tokens[tokens.length - 4] + File.separator - + tokens[tokens.length - 3] + File.separator + tokens[tokens.length - 2]; + public DatasetResourceReference getLocalResourceReference(String absoluteFilePath) throws HyracksDataException { + //TODO pass relative path + final String localResourcePath = StoragePathUtil.getIndexFileRelativePath(absoluteFilePath); + final LocalResource lr = get(localResourcePath); + return DatasetResourceReference.of(lr); } - public IndexFileProperties getIndexFileRef(String absoluteFilePath) throws HyracksDataException { - //TODO pass relative path - final String[] tokens = absoluteFilePath.split(File.separator); - if (tokens.length < 5) { - throw new HyracksDataException("Invalid file format"); + public Set<File> getPartitionIndexes(int partition) throws HyracksDataException { + final Map<Long, LocalResource> partitionResourcesMap = getResources(resource -> { + DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource(); + return dsResource.getPartition() == partition; + }); + Set<File> indexes = new HashSet<>(); + for (LocalResource localResource : partitionResourcesMap.values()) { + indexes.add(ioManager.resolve(localResource.getPath()).getFile().getParentFile()); } - String fileName = tokens[tokens.length - 1]; - String index = tokens[tokens.length - 2]; - String dataverse = tokens[tokens.length - 3]; - String partition = tokens[tokens.length - 4]; - int partitionId = StoragePathUtil.getPartitionNumFromName(partition); - String relativePath = getLocalResourceRelativePath(absoluteFilePath); - final LocalResource lr = get(relativePath); - int datasetId = lr == null ? -1 : ((DatasetLocalResource) lr.getResource()).getDatasetId(); - return new IndexFileProperties(partitionId, dataverse, index, fileName, datasetId); + return indexes; + } + + public FileReference getIndexPath(Path indexFile) throws HyracksDataException { + final ResourceReference ref = ResourceReference.of(indexFile.toString()); + return ioManager.resolve(ref.getRelativePath().toString()); } } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java index 6ce543b..9f5b83c 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java @@ -119,7 +119,7 @@ return minFirstLSN; } - private long getDeadReplicasMinFirstLSN(Set<String> deadReplicaIds) { + private long getDeadReplicasMinFirstLSN(Set<String> deadReplicaIds) throws HyracksDataException { final IReplicaResourcesManager remoteResourcesManager = txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager(); final IApplicationContext propertiesProvider = diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResource.java index b9ad1b1..4cf145b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResource.java @@ -35,7 +35,7 @@ public class BTreeResource implements IResource { private static final long serialVersionUID = 1L; - private final String path; + private String path; private final IStorageManager storageManager; private final ITypeTraits[] typeTraits; private final IBinaryComparatorFactory[] comparatorFactories; @@ -63,4 +63,9 @@ public String getPath() { return path; } + + @Override + public void setPath(String path) { + this.path = path; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java index 6255c1d..b541750 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java @@ -18,13 +18,10 @@ */ package org.apache.hyracks.storage.am.lsm.common.dataflow; -import java.util.List; import java.util.Map; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.ITypeTraits; -import org.apache.hyracks.api.io.IIOManager; -import org.apache.hyracks.api.io.IODeviceHandle; import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider; @@ -43,7 +40,7 @@ public abstract class LsmResource implements IResource { private static final long serialVersionUID = 1L; - protected final String path; + protected String path; protected final IStorageManager storageManager; protected final ITypeTraits[] typeTraits; protected final IBinaryComparatorFactory[] cmpFactories; @@ -88,14 +85,8 @@ return path; } - public static int getIoDeviceNum(IIOManager ioManager, IODeviceHandle deviceHandle) { - List<IODeviceHandle> ioDevices = ioManager.getIODevices(); - for (int i = 0; i < ioDevices.size(); i++) { - IODeviceHandle device = ioDevices.get(i); - if (device == deviceHandle) { - return i; - } - } - return -1; + @Override + public void setPath(String path) { + this.path = path; } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResource.java index df4fbf2..f9eb844 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResource.java @@ -35,7 +35,7 @@ public class RTreeResource implements IResource { private static final long serialVersionUID = 1L; - private final String path; + private String path; private final IStorageManager storageManager; private final ITypeTraits[] typeTraits; private final IBinaryComparatorFactory[] comparatorFactories; @@ -68,4 +68,8 @@ return path; } + @Override + public void setPath(String path) { + this.path = path; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResource.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResource.java index bb27023..7b9166d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResource.java @@ -28,4 +28,11 @@ IIndex createInstance(INCServiceContext ncServiceCtx) throws HyracksDataException; String getPath(); + + /** + * Sets the path of {@link IResource}. + * + * @param path + */ + void setPath(String path); } -- To view, visit https://asterix-gerrit.ics.uci.edu/2181 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I0f968b9f493bf5aa2d49f503afe21f0d438bb7f0 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <mhub...@apache.org>