Murtadha Hubail has submitted this change and it was merged. Change subject: Exclude Temporary Resources From Replication ......................................................................
Exclude Temporary Resources From Replication - Exclude temporary resources from replication. - Remove flush logs from temporary datasets. - Ignore takeover partitions request if NC is shutting down. - Stop NCs on different threads to allow replica shutting down notification to be sent when replication is enabled. Change-Id: I9a52557bf1f3e7632dd826384280abdaa186f672 Reviewed-on: https://asterix-gerrit.ics.uci.edu/778 Tested-by: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> --- M asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java M asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java M asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java M asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java M asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java M asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java 7 files changed, 83 insertions(+), 37 deletions(-) Approvals: abdullah alamoudi: Looks good to me, approved Jenkins: Verified diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java index c67eb70..cc50b75 100644 --- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java +++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java @@ -19,6 +19,7 @@ package org.apache.asterix.api.common; import java.io.File; +import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; import java.util.List; @@ -138,11 +139,31 @@ } public static void deinit(boolean deleteOldInstanceData) throws Exception { + //stop NCs + ArrayList<Thread> stopNCThreads = new ArrayList<>(); for (int n = 0; n < ncs.length; ++n) { - if (ncs[n] != null) - ncs[n].stop(); - + NodeControllerService nodeControllerService = ncs[n]; + if (nodeControllerService != null) { + Thread ncStopThread = new Thread() { + @Override + public void run() { + try { + nodeControllerService.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + stopNCThreads.add(ncStopThread); + ncStopThread.start(); + } } + + //make sure all NCs stopped + for (Thread stopNcTheard : stopNCThreads) { + stopNcTheard.join(); + } + if (cc != null) { cc.stop(); } diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java index 0a0a917..13b0189 100644 --- a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java +++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java @@ -124,14 +124,17 @@ private void handleTakeoverPartitons(IMessage message) throws Exception { TakeoverPartitionsRequestMessage msg = (TakeoverPartitionsRequestMessage) message; - try { - IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager(); - remoteRecoeryManager.takeoverPartitons(msg.getPartitions()); - } finally { - //send response after takeover is completed - TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(), - appContext.getTransactionSubsystem().getId(), msg.getPartitions()); - sendMessage(reponse, null); + //if the NC is shutting down, it should ignore takeover partitions request + if (!appContext.isShuttingdown()) { + try { + IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager(); + remoteRecoeryManager.takeoverPartitons(msg.getPartitions()); + } finally { + //send response after takeover is completed + TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(), + appContext.getTransactionSubsystem().getId(), msg.getPartitions()); + sendMessage(reponse, null); + } } } diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index f667bd8..c5f6915 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -65,8 +65,8 @@ private final int numPartitions; public DatasetLifecycleManager(AsterixStorageProperties storageProperties, - ILocalResourceRepository resourceRepository, int firstAvilableUserDatasetID, - ILogManager logManager, int numPartitions) { + ILocalResourceRepository resourceRepository, int firstAvilableUserDatasetID, ILogManager logManager, + int numPartitions) { this.logManager = logManager; this.storageProperties = storageProperties; this.resourceRepository = resourceRepository; @@ -111,6 +111,7 @@ if (!dsInfo.isRegistered) { dsInfo.isExternal = !index.hasMemoryComponents(); dsInfo.isRegistered = true; + dsInfo.durable = ((ILSMIndex) index).isDurable(); } if (dsInfo.indexes.containsKey(resourceID)) { @@ -338,6 +339,7 @@ return dvbcs; } } + @Override public ILSMOperationTracker getOperationTracker(int datasetID) { synchronized (datasetOpTrackers) { @@ -400,6 +402,7 @@ private boolean isExternal; private boolean isRegistered; private boolean memoryAllocated; + private boolean durable; public DatasetInfo(int datasetID) { this.indexes = new HashMap<Long, IndexInfo>(); @@ -480,7 +483,11 @@ public String toString() { return "DatasetID: " + datasetID + ", isOpen: " + isOpen + ", refCount: " + referenceCount + ", lastAccess: " + lastAccess + ", isRegistered: " + isRegistered + ", memoryAllocated: " - + memoryAllocated; + + memoryAllocated + ", isDurable: " + durable; + } + + public boolean isDurable() { + return durable; } } @@ -536,7 +543,7 @@ * This method can only be called asynchronously safely if we're sure no modify operation will take place until the flush is scheduled */ private void flushDatasetOpenIndexes(DatasetInfo dsInfo, boolean asyncFlush) throws HyracksDataException { - if (!dsInfo.isExternal) { + if (!dsInfo.isExternal && dsInfo.durable) { synchronized (logRecord) { TransactionUtil.formFlushLogRecord(logRecord, dsInfo.datasetID, null, logManager.getNodeId(), dsInfo.indexes.size()); @@ -731,8 +738,10 @@ List<IVirtualBufferCache> vbcs = new ArrayList<>(); for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) { MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache( - new VirtualBufferCache(new ResourceHeapBufferAllocator(DatasetLifecycleManager.this, - Integer.toString(datasetID)), storageProperties.getMemoryComponentPageSize(), + new VirtualBufferCache( + new ResourceHeapBufferAllocator(DatasetLifecycleManager.this, + Integer.toString(datasetID)), + storageProperties.getMemoryComponentPageSize(), numPages / storageProperties.getMemoryComponentsNum() / numPartitions)); vbcs.add(vbc); } diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java index e5a3473..dac05e7 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java @@ -121,13 +121,15 @@ } } } - LogRecord logRecord = new LogRecord(); - TransactionUtil.formFlushLogRecord(logRecord, datasetID, this, logManager.getNodeId(), - dsInfo.getDatasetIndexes().size()); - try { - logManager.log(logRecord); - } catch (ACIDException e) { - throw new HyracksDataException("could not write flush log", e); + if (dsInfo.isDurable()) { + LogRecord logRecord = new LogRecord(); + TransactionUtil.formFlushLogRecord(logRecord, datasetID, this, logManager.getNodeId(), + dsInfo.getDatasetIndexes().size()); + try { + logManager.log(logRecord); + } catch (ACIDException e) { + throw new HyracksDataException("could not write flush log", e); + } } flushLogCreated = true; diff --git a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java index 5b4035c..78b06fb 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java @@ -67,7 +67,7 @@ return (datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName); } - public static int getPartitonNumFromName(String name) { + public static int getPartitionNumFromName(String name) { return Integer.parseInt(name.substring(PARTITION_DIR_PREFIX.length())); } } diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java index 2bf5fa3..a349e51 100644 --- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java +++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java @@ -69,7 +69,7 @@ this.fileName = tokens[arraySize - 1]; this.idxName = tokens[arraySize - 2]; this.dataverse = tokens[arraySize - 3]; - this.partition = StoragePathUtil.getPartitonNumFromName(tokens[arraySize - 4]); + this.partition = StoragePathUtil.getPartitionNumFromName(tokens[arraySize - 4]); } public void serialize(OutputStream out) throws IOException { diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index 3a1e729..561b144 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -337,14 +337,25 @@ } private void createReplicationJob(ReplicationOperation operation, String filePath) throws HyracksDataException { - filesToBeReplicated.clear(); - filesToBeReplicated.add(filePath); - AsterixReplicationJob job = new AsterixReplicationJob(ReplicationJobType.METADATA, operation, - ReplicationExecutionType.SYNC, filesToBeReplicated); - try { - replicationManager.submitJob(job); - } catch (IOException e) { - throw new HyracksDataException(e); + /** + * Durable resources path format: + * /partition/dataverse/idx/fileName + * Temporary resources path format: + * /partition/TEMP_DATASETS_STORAGE_FOLDER/dataverse/idx/fileName + */ + String[] fileNameTokens = filePath.split(File.separator); + String partitionDir = fileNameTokens[fileNameTokens.length - 4]; + //exclude temporary datasets resources + if (!partitionDir.equals(StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER)) { + filesToBeReplicated.clear(); + filesToBeReplicated.add(filePath); + AsterixReplicationJob job = new AsterixReplicationJob(ReplicationJobType.METADATA, operation, + ReplicationExecutionType.SYNC, filesToBeReplicated); + try { + replicationManager.submitJob(job); + } catch (IOException e) { + throw new HyracksDataException(e); + } } } @@ -454,14 +465,14 @@ */ public static String getResourceRelativePath(String resourceAbsolutePath) { String[] tokens = resourceAbsolutePath.split(File.separator); - //partiton/dataverse/idx/fileName + //partition/dataverse/idx/fileName return tokens[tokens.length - 4] + File.separator + tokens[tokens.length - 3] + File.separator + tokens[tokens.length - 2] + File.separator + tokens[tokens.length - 1]; } public static int getResourcePartition(String resourceAbsolutePath) { String[] tokens = resourceAbsolutePath.split(File.separator); - //partiton/dataverse/idx/fileName - return StoragePathUtil.getPartitonNumFromName(tokens[tokens.length - 4]); + //partition/dataverse/idx/fileName + return StoragePathUtil.getPartitionNumFromName(tokens[tokens.length - 4]); } } -- To view, visit https://asterix-gerrit.ics.uci.edu/778 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I9a52557bf1f3e7632dd826384280abdaa186f672 Gerrit-PatchSet: 3 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
