Murtadha Hubail has submitted this change and it was merged. Change subject: [ASTERIXDB-2195][REPL] Clean Masked Files ......................................................................
[ASTERIXDB-2195][REPL] Clean Masked Files - user model changes: no - storage format changes: no - interface changes: no Details: - Clean invalid masked files before promoting a partition or sending partition files list to master. - Let replica calculate component id instead of sending it from master. - Add tests for: - Deleting masked component. - Deleting masked file. Change-Id: Ib0f0159159faf87b9f5fd2eca3956dd90633bcfa Reviewed-on: https://asterix-gerrit.ics.uci.edu/2268 Sonar-Qube: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java 9 files changed, 188 insertions(+), 33 deletions(-) Approvals: Anon. E. Moose #1000171: Jenkins: Verified; No violations found; ; Verified Michael Blow: Looks good to me, approved diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java index 155fa1d..4edae69 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java @@ -33,6 +33,7 @@ import org.apache.asterix.common.storage.ReplicaIdentifier; import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.asterix.replication.api.PartitionReplica; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.exceptions.HyracksDataException; public class ReplicaManager implements IReplicaManager { @@ -85,6 +86,9 @@ @Override public synchronized void promote(int partition) throws HyracksDataException { + final PersistentLocalResourceRepository localResourceRepository = + (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); + localResourceRepository.cleanup(partition); final IRecoveryManager recoveryManager = appCtx.getTransactionSubsystem().getRecoveryManager(); recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true); partitions.add(partition); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java index 128aee6..6d114c6 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.LinkedHashSet; import java.util.Map; +import java.util.Optional; import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil; import org.apache.asterix.app.active.ActiveNotificationHandler; @@ -214,4 +215,13 @@ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); } } + + public static String getIndexPath(AsterixHyracksIntegrationUtil integrationUtil, Dataset dataset, String nodeId) + throws Exception { + final FileSplit[] datasetSplits = TestDataUtil.getDatasetSplits(integrationUtil, dataset); + final Optional<FileSplit> nodeFileSplit = + Arrays.stream(datasetSplits).filter(s -> s.getNodeName().equals(nodeId)).findFirst(); + Assert.assertTrue(nodeFileSplit.isPresent()); + return nodeFileSplit.get().getPath(); + } } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java new file mode 100644 index 0000000..6401d90 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.storage; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil; +import org.apache.asterix.common.TestDataUtil; +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.common.dataflow.DatasetLocalResource; +import org.apache.asterix.common.utils.StorageConstants; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.hyracks.api.io.FileReference; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class PersistentLocalResourceRepositoryTest { + + protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf"; + private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil(); + + @Before + public void setUp() throws Exception { + System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME); + integrationUtil.init(true, TEST_CONFIG_FILE_NAME); + } + + @After + public void tearDown() throws Exception { + integrationUtil.deinit(true); + } + + @Test + public void deleteMaskedFiles() throws Exception { + final INcApplicationContext ncAppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext(); + final String nodeId = ncAppCtx.getServiceContext().getNodeId(); + final String datasetName = "ds"; + TestDataUtil.createIdOnlyDataset(datasetName); + final Dataset dataset = TestDataUtil.getDataset(integrationUtil, datasetName); + final String indexPath = TestDataUtil.getIndexPath(integrationUtil, dataset, nodeId); + FileReference indexDirRef = ncAppCtx.getIoManager().resolve(indexPath); + // create masked component files + String indexDir = indexDirRef.getFile().getAbsolutePath(); + String componentId = "12345_12345"; + String btree = componentId + "_b"; + String filter = componentId + "_f"; + Path maskPath = Paths.get(indexDir, StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentId); + Path btreePath = Paths.get(indexDir, btree); + Path filterPath = Paths.get(indexDir, filter); + Files.createFile(maskPath); + Files.createFile(btreePath); + Files.createFile(filterPath); + // clean up the dataset partition + PersistentLocalResourceRepository localResourceRepository = + (PersistentLocalResourceRepository) ncAppCtx.getLocalResourceRepository(); + DatasetLocalResource lr = (DatasetLocalResource) localResourceRepository.get(indexPath).getResource(); + int partition = lr.getPartition(); + localResourceRepository.cleanup(partition); + + // ensure all masked files and the mask were deleted + Assert.assertFalse(maskPath.toFile().exists()); + Assert.assertFalse(btreePath.toFile().exists()); + Assert.assertFalse(filterPath.toFile().exists()); + + // create single masked file + String fileName = "someFile"; + maskPath = Paths.get(indexDir, StorageConstants.MASK_FILE_PREFIX + fileName); + Path filePath = Paths.get(indexDir, fileName); + Files.createFile(maskPath); + Files.createFile(filePath); + localResourceRepository.cleanup(partition); + + // ensure the masked file and the mask were deleted + Assert.assertFalse(maskPath.toFile().exists()); + Assert.assertFalse(filePath.toFile().exists()); + } +} diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java index f43f3ff..b14d70b 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java @@ -79,7 +79,7 @@ final String datasetName = "ds"; TestDataUtil.createIdOnlyDataset(datasetName); final Dataset dataset = TestDataUtil.getDataset(integrationUtil, datasetName); - final String indexPath = getIndexPath(dataset, nodeId); + final String indexPath = TestDataUtil.getIndexPath(integrationUtil, dataset, nodeId); final IDatasetLifecycleManager dclm = ncAppCtx.getDatasetLifecycleManager(); dclm.open(indexPath); final ILSMIndex index = (ILSMIndex) dclm.get(indexPath); @@ -183,14 +183,6 @@ // make sure we can still log to the new file interruptedLogPageSwitch(); - } - - private static String getIndexPath(Dataset dataset, String nodeId) throws Exception { - final FileSplit[] datasetSplits = TestDataUtil.getDatasetSplits(integrationUtil, dataset); - final Optional<FileSplit> nodeFileSplit = - Arrays.stream(datasetSplits).filter(s -> s.getNodeName().equals(nodeId)).findFirst(); - Assert.assertTrue(nodeFileSplit.isPresent()); - return nodeFileSplit.get().getPath(); } private static ITransactionContext beingTransaction(INcApplicationContext ncAppCtx, ILSMIndex index, diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java index f59914d..265c9fd 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java @@ -36,6 +36,7 @@ public static final String INDEX_CHECKPOINT_FILE_PREFIX = ".idx_checkpoint_"; public static final String METADATA_FILE_NAME = ".metadata"; public static final String MASK_FILE_PREFIX = ".mask_"; + public static final String COMPONENT_MASK_FILE_PREFIX = MASK_FILE_PREFIX + "C_"; public static final String LEGACY_DATASET_INDEX_NAME_SEPARATOR = "_idx_"; /** diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java index 26c9577..d5dc51d 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java @@ -28,9 +28,10 @@ import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.exceptions.ReplicationException; -import org.apache.asterix.replication.api.IReplicationWorker; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.replication.api.IReplicaTask; +import org.apache.asterix.replication.api.IReplicationWorker; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; @@ -40,13 +41,10 @@ */ public class ComponentMaskTask implements IReplicaTask { - private static final String COMPONENT_MASK_FILE_PREFIX = StorageConstants.MASK_FILE_PREFIX + "C_"; private final String file; - private final String componentId; - public ComponentMaskTask(String file, String componentId) { + public ComponentMaskTask(String file) { this.file = file; - this.componentId = componentId; } @Override @@ -61,11 +59,12 @@ } } - public static Path getComponentMaskPath(INcApplicationContext appCtx, String file) throws IOException { + public static Path getComponentMaskPath(INcApplicationContext appCtx, String componentFile) throws IOException { final IIOManager ioManager = appCtx.getIoManager(); - final FileReference localPath = ioManager.resolve(file); + final FileReference localPath = ioManager.resolve(componentFile); final Path resourceDir = Files.createDirectories(localPath.getFile().getParentFile().toPath()); - return Paths.get(resourceDir.toString(), COMPONENT_MASK_FILE_PREFIX + localPath.getFile().getName()); + final String componentId = PersistentLocalResourceRepository.getComponentId(componentFile); + return Paths.get(resourceDir.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentId); } @Override @@ -78,7 +77,6 @@ try { final DataOutputStream dos = new DataOutputStream(out); dos.writeUTF(file); - dos.writeUTF(componentId); } catch (IOException e) { throw HyracksDataException.create(e); } @@ -86,7 +84,6 @@ public static ComponentMaskTask create(DataInput input) throws IOException { String indexFile = input.readUTF(); - String componentId = input.readUTF(); - return new ComponentMaskTask(indexFile, componentId); + return new ComponentMaskTask(indexFile); } } \ No newline at end of file diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java index b972f32..54d3a02 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java @@ -45,9 +45,9 @@ @Override public void perform(INcApplicationContext appCtx, IReplicationWorker worker) throws HyracksDataException { - //TODO delete any invalid files with masks final PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); + localResourceRepository.cleanup(partition); final List<String> partitionResources = localResourceRepository.getPartitionIndexesFiles(partition).stream() .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList()); final PartitionResourcesListResponse response = diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java index 74f38e2..95ae690 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java @@ -22,11 +22,9 @@ import static org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation.REPLICATE; import java.io.IOException; -import java.nio.file.Paths; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; -import org.apache.asterix.common.storage.ResourceReference; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.replication.api.PartitionReplica; import org.apache.asterix.replication.messaging.ComponentMaskTask; @@ -39,7 +37,6 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; -import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -86,9 +83,8 @@ private void replicateComponent(PartitionReplica replica) throws IOException { // send component header final String anyFile = job.getAnyFile(); - final String lsmComponentID = getComponentId(anyFile); final String indexFile = StoragePathUtil.getFileRelativePath(anyFile); - final ComponentMaskTask maskTask = new ComponentMaskTask(indexFile, lsmComponentID); + final ComponentMaskTask maskTask = new ComponentMaskTask(indexFile); ReplicationProtocol.sendTo(replica, maskTask); ReplicationProtocol.waitForAck(replica); // send component files @@ -129,12 +125,5 @@ final ILSMIndexOperationContext ctx = indexReplJob.getLSMIndexOperationContext(); return ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback()) .getComponentLSN(ctx.getComponentsToBeReplicated()); - } - - private static String getComponentId(String filePath) { - final ResourceReference ref = ResourceReference.of(filePath); - final String fileUniqueTimestamp = - ref.getName().substring(0, ref.getName().lastIndexOf(AbstractLSMIndexFileManager.DELIMITER)); - return Paths.get(ref.getRelativePath().toString(), fileUniqueTimestamp).toString(); } } \ No newline at end of file diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index 54d6268..6ffeb28 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -49,6 +49,7 @@ import org.apache.asterix.common.replication.ReplicationJob; import org.apache.asterix.common.storage.DatasetResourceReference; import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; +import org.apache.asterix.common.storage.ResourceReference; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.commons.io.FileUtils; @@ -62,8 +63,11 @@ import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation; import org.apache.hyracks.api.util.IoUtil; import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager; import org.apache.hyracks.storage.common.ILocalResourceRepository; import org.apache.hyracks.storage.common.LocalResource; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -71,8 +75,11 @@ public class PersistentLocalResourceRepository implements ILocalResourceRepository { public static final Predicate<Path> INDEX_COMPONENTS = path -> !path.endsWith(StorageConstants.METADATA_FILE_NAME); + private static final Logger LOGGER = LogManager.getLogger(); private static final FilenameFilter LSM_INDEX_FILES_FILTER = (dir, name) -> !name.startsWith(INDEX_CHECKPOINT_FILE_PREFIX); + private static final FilenameFilter MASK_FILES_FILTER = + (dir, name) -> name.startsWith(StorageConstants.MASK_FILE_PREFIX); private static final int MAX_CACHED_RESOURCES = 1000; private static final IOFileFilter METADATA_FILES_FILTER = new IOFileFilter() { @Override @@ -349,4 +356,60 @@ } } } + + public void cleanup(int partition) throws HyracksDataException { + final Set<File> partitionIndexes = getPartitionIndexes(partition); + // find masks + for (File index : partitionIndexes) { + File[] masks = index.listFiles(MASK_FILES_FILTER); + if (masks != null) { + try { + for (File mask : masks) { + deleteIndexMaskedFiles(index, mask); + // delete the mask itself + Files.delete(mask.toPath()); + } + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + } + } + + private void deleteIndexMaskedFiles(File index, File mask) throws IOException { + if (!mask.getName().startsWith(StorageConstants.MASK_FILE_PREFIX)) { + throw new IllegalArgumentException("Unrecognized mask file: " + mask); + } + File[] maskedFiles; + if (isComponentMask(mask)) { + final String componentId = mask.getName().substring(StorageConstants.COMPONENT_MASK_FILE_PREFIX.length()); + maskedFiles = index.listFiles((dir, name) -> name.startsWith(componentId)); + } else { + final String maskedFileName = mask.getName().substring(StorageConstants.MASK_FILE_PREFIX.length()); + maskedFiles = index.listFiles((dir, name) -> name.equals(maskedFileName)); + } + if (maskedFiles != null) { + for (File maskedFile : maskedFiles) { + LOGGER.info(() -> "deleting masked file: " + maskedFile.getAbsolutePath()); + Files.delete(maskedFile.toPath()); + } + } + } + + /** + * Gets a component id based on its unique timestamp. + * e.g. a component file 2018-01-08-01-08-50-439_2018-01-08-01-08-50-439_b + * will return a component id 2018-01-08-01-08-50-439_2018-01-08-01-08-50-439 + * + * @param componentFile any component file + * @return The component id + */ + public static String getComponentId(String componentFile) { + final ResourceReference ref = ResourceReference.of(componentFile); + return ref.getName().substring(0, ref.getName().lastIndexOf(AbstractLSMIndexFileManager.DELIMITER)); + } + + private static boolean isComponentMask(File mask) { + return mask.getName().startsWith(StorageConstants.COMPONENT_MASK_FILE_PREFIX); + } } -- To view, visit https://asterix-gerrit.ics.uci.edu/2268 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ib0f0159159faf87b9f5fd2eca3956dd90633bcfa Gerrit-PatchSet: 2 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
