Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/778
Change subject: Exclude Temporary Resources From Replication
......................................................................
Exclude Temporary Resources From Replication
- Exclude temporary resources from replication.
- Remove flush logs from temporary datasets.
- Ignore takeover partitions request if NC is shutting down.
- Stop NCs on different threads to allow replica shutting down
notification to be sent when replication is enabled.
Change-Id: I9a52557bf1f3e7632dd826384280abdaa186f672
---
M
asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
M
asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M
asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
M
asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
5 files changed, 78 insertions(+), 32 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/78/778/1
diff --git
a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index c67eb70..cc50b75 100644
---
a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++
b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -19,6 +19,7 @@
package org.apache.asterix.api.common;
import java.io.File;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
@@ -138,11 +139,31 @@
}
public static void deinit(boolean deleteOldInstanceData) throws Exception {
+ //stop NCs
+ ArrayList<Thread> stopNCThreads = new ArrayList<>();
for (int n = 0; n < ncs.length; ++n) {
- if (ncs[n] != null)
- ncs[n].stop();
-
+ NodeControllerService nodeControllerService = ncs[n];
+ if (nodeControllerService != null) {
+ Thread ncStopThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ nodeControllerService.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ stopNCThreads.add(ncStopThread);
+ ncStopThread.start();
+ }
}
+
+ //make sure all NCs stopped
+ for (Thread stopNcTheard : stopNCThreads) {
+ stopNcTheard.join();
+ }
+
if (cc != null) {
cc.stop();
}
diff --git
a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 0a0a917..88ffe19 100644
---
a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -124,14 +124,17 @@
private void handleTakeoverPartitons(IMessage message) throws Exception {
TakeoverPartitionsRequestMessage msg =
(TakeoverPartitionsRequestMessage) message;
- try {
- IRemoteRecoveryManager remoteRecoeryManager =
appContext.getRemoteRecoveryManager();
- remoteRecoeryManager.takeoverPartitons(msg.getPartitions());
- } finally {
- //send response after takeover is completed
- TakeoverPartitionsResponseMessage reponse = new
TakeoverPartitionsResponseMessage(msg.getRequestId(),
- appContext.getTransactionSubsystem().getId(),
msg.getPartitions());
- sendMessage(reponse, null);
+ //if the NC is shutting down, it should ignore take partition takeover
request
+ if (!appContext.isShuttingdown()) {
+ try {
+ IRemoteRecoveryManager remoteRecoeryManager =
appContext.getRemoteRecoveryManager();
+ remoteRecoeryManager.takeoverPartitons(msg.getPartitions());
+ } finally {
+ //send response after takeover is completed
+ TakeoverPartitionsResponseMessage reponse = new
TakeoverPartitionsResponseMessage(msg.getRequestId(),
+ appContext.getTransactionSubsystem().getId(),
msg.getPartitions());
+ sendMessage(reponse, null);
+ }
}
}
diff --git
a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index f667bd8..c5f6915 100644
---
a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++
b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -65,8 +65,8 @@
private final int numPartitions;
public DatasetLifecycleManager(AsterixStorageProperties storageProperties,
- ILocalResourceRepository
resourceRepository, int firstAvilableUserDatasetID,
- ILogManager logManager, int numPartitions) {
+ ILocalResourceRepository resourceRepository, int
firstAvilableUserDatasetID, ILogManager logManager,
+ int numPartitions) {
this.logManager = logManager;
this.storageProperties = storageProperties;
this.resourceRepository = resourceRepository;
@@ -111,6 +111,7 @@
if (!dsInfo.isRegistered) {
dsInfo.isExternal = !index.hasMemoryComponents();
dsInfo.isRegistered = true;
+ dsInfo.durable = ((ILSMIndex) index).isDurable();
}
if (dsInfo.indexes.containsKey(resourceID)) {
@@ -338,6 +339,7 @@
return dvbcs;
}
}
+
@Override
public ILSMOperationTracker getOperationTracker(int datasetID) {
synchronized (datasetOpTrackers) {
@@ -400,6 +402,7 @@
private boolean isExternal;
private boolean isRegistered;
private boolean memoryAllocated;
+ private boolean durable;
public DatasetInfo(int datasetID) {
this.indexes = new HashMap<Long, IndexInfo>();
@@ -480,7 +483,11 @@
public String toString() {
return "DatasetID: " + datasetID + ", isOpen: " + isOpen + ",
refCount: " + referenceCount
+ ", lastAccess: " + lastAccess + ", isRegistered: " +
isRegistered + ", memoryAllocated: "
- + memoryAllocated;
+ + memoryAllocated + ", isDurable: " + durable;
+ }
+
+ public boolean isDurable() {
+ return durable;
}
}
@@ -536,7 +543,7 @@
* This method can only be called asynchronously safely if we're sure no
modify operation will take place until the flush is scheduled
*/
private void flushDatasetOpenIndexes(DatasetInfo dsInfo, boolean
asyncFlush) throws HyracksDataException {
- if (!dsInfo.isExternal) {
+ if (!dsInfo.isExternal && dsInfo.durable) {
synchronized (logRecord) {
TransactionUtil.formFlushLogRecord(logRecord,
dsInfo.datasetID, null, logManager.getNodeId(),
dsInfo.indexes.size());
@@ -731,8 +738,10 @@
List<IVirtualBufferCache> vbcs = new ArrayList<>();
for (int i = 0; i < storageProperties.getMemoryComponentsNum();
i++) {
MultitenantVirtualBufferCache vbc = new
MultitenantVirtualBufferCache(
- new VirtualBufferCache(new
ResourceHeapBufferAllocator(DatasetLifecycleManager.this,
- Integer.toString(datasetID)),
storageProperties.getMemoryComponentPageSize(),
+ new VirtualBufferCache(
+ new
ResourceHeapBufferAllocator(DatasetLifecycleManager.this,
+ Integer.toString(datasetID)),
+ storageProperties.getMemoryComponentPageSize(),
numPages /
storageProperties.getMemoryComponentsNum() / numPartitions));
vbcs.add(vbc);
}
diff --git
a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index e5a3473..dac05e7 100644
---
a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++
b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -121,13 +121,15 @@
}
}
}
- LogRecord logRecord = new LogRecord();
- TransactionUtil.formFlushLogRecord(logRecord, datasetID, this,
logManager.getNodeId(),
- dsInfo.getDatasetIndexes().size());
- try {
- logManager.log(logRecord);
- } catch (ACIDException e) {
- throw new HyracksDataException("could not write flush log", e);
+ if (dsInfo.isDurable()) {
+ LogRecord logRecord = new LogRecord();
+ TransactionUtil.formFlushLogRecord(logRecord, datasetID, this,
logManager.getNodeId(),
+ dsInfo.getDatasetIndexes().size());
+ try {
+ logManager.log(logRecord);
+ } catch (ACIDException e) {
+ throw new HyracksDataException("could not write flush
log", e);
+ }
}
flushLogCreated = true;
diff --git
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 3a1e729..314d41a 100644
---
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -337,14 +337,25 @@
}
private void createReplicationJob(ReplicationOperation operation, String
filePath) throws HyracksDataException {
- filesToBeReplicated.clear();
- filesToBeReplicated.add(filePath);
- AsterixReplicationJob job = new
AsterixReplicationJob(ReplicationJobType.METADATA, operation,
- ReplicationExecutionType.SYNC, filesToBeReplicated);
- try {
- replicationManager.submitJob(job);
- } catch (IOException e) {
- throw new HyracksDataException(e);
+ /**
+ * Durable resources path format:
+ * /partiton/dataverse/idx/fileName
+ * Temporary resources path format:
+ * /partiton/TEMP_DATASETS_STORAGE_FOLDER/dataverse/idx/fileName
+ */
+ String[] fileNameTokens = filePath.split(File.separator);
+ String partitionDir = fileNameTokens[fileNameTokens.length - 4];
+ //exclude temporary datasets resources
+ if
(!partitionDir.equals(StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER)) {
+ filesToBeReplicated.clear();
+ filesToBeReplicated.add(filePath);
+ AsterixReplicationJob job = new
AsterixReplicationJob(ReplicationJobType.METADATA, operation,
+ ReplicationExecutionType.SYNC, filesToBeReplicated);
+ try {
+ replicationManager.submitJob(job);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/778
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I9a52557bf1f3e7632dd826384280abdaa186f672
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>