Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/778

Change subject: Exclude Temporary Resources From Replication
......................................................................

Exclude Temporary Resources From Replication

- Exclude temporary resources from replication.
- Remove flush logs from temporary datasets.
- Ignore takeover partitions request if NC is shutting down.
- Stop NCs on different threads to allow replica shutting down
  notification to be sent when replication is enabled.

Change-Id: I9a52557bf1f3e7632dd826384280abdaa186f672
---
M 
asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
M 
asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M 
asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
M 
asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
5 files changed, 78 insertions(+), 32 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/78/778/1

diff --git 
a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
 
b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index c67eb70..cc50b75 100644
--- 
a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ 
b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.api.common;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
@@ -138,11 +139,31 @@
     }
 
     public static void deinit(boolean deleteOldInstanceData) throws Exception {
+        //stop NCs
+        ArrayList<Thread> stopNCThreads = new ArrayList<>();
         for (int n = 0; n < ncs.length; ++n) {
-            if (ncs[n] != null)
-                ncs[n].stop();
-
+            NodeControllerService nodeControllerService = ncs[n];
+            if (nodeControllerService != null) {
+                Thread ncStopThread = new Thread() {
+                    @Override
+                    public void run() {
+                        try {
+                            nodeControllerService.stop();
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                };
+                stopNCThreads.add(ncStopThread);
+                ncStopThread.start();
+            }
         }
+
+        //make sure all NCs stopped
+        for (Thread stopNcTheard : stopNCThreads) {
+            stopNcTheard.join();
+        }
+
         if (cc != null) {
             cc.stop();
         }
diff --git 
a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java 
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 0a0a917..88ffe19 100644
--- 
a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ 
b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -124,14 +124,17 @@
 
     private void handleTakeoverPartitons(IMessage message) throws Exception {
         TakeoverPartitionsRequestMessage msg = 
(TakeoverPartitionsRequestMessage) message;
-        try {
-            IRemoteRecoveryManager remoteRecoeryManager = 
appContext.getRemoteRecoveryManager();
-            remoteRecoeryManager.takeoverPartitons(msg.getPartitions());
-        } finally {
-            //send response after takeover is completed
-            TakeoverPartitionsResponseMessage reponse = new 
TakeoverPartitionsResponseMessage(msg.getRequestId(),
-                    appContext.getTransactionSubsystem().getId(), 
msg.getPartitions());
-            sendMessage(reponse, null);
+        //if the NC is shutting down, it should ignore take partition takeover 
request
+        if (!appContext.isShuttingdown()) {
+            try {
+                IRemoteRecoveryManager remoteRecoeryManager = 
appContext.getRemoteRecoveryManager();
+                remoteRecoeryManager.takeoverPartitons(msg.getPartitions());
+            } finally {
+                //send response after takeover is completed
+                TakeoverPartitionsResponseMessage reponse = new 
TakeoverPartitionsResponseMessage(msg.getRequestId(),
+                        appContext.getTransactionSubsystem().getId(), 
msg.getPartitions());
+                sendMessage(reponse, null);
+            }
         }
     }
 
diff --git 
a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
 
b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index f667bd8..c5f6915 100644
--- 
a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ 
b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -65,8 +65,8 @@
     private final int numPartitions;
 
     public DatasetLifecycleManager(AsterixStorageProperties storageProperties,
-                                   ILocalResourceRepository 
resourceRepository, int firstAvilableUserDatasetID,
-                                   ILogManager logManager, int numPartitions) {
+            ILocalResourceRepository resourceRepository, int 
firstAvilableUserDatasetID, ILogManager logManager,
+            int numPartitions) {
         this.logManager = logManager;
         this.storageProperties = storageProperties;
         this.resourceRepository = resourceRepository;
@@ -111,6 +111,7 @@
         if (!dsInfo.isRegistered) {
             dsInfo.isExternal = !index.hasMemoryComponents();
             dsInfo.isRegistered = true;
+            dsInfo.durable = ((ILSMIndex) index).isDurable();
         }
 
         if (dsInfo.indexes.containsKey(resourceID)) {
@@ -338,6 +339,7 @@
             return dvbcs;
         }
     }
+
     @Override
     public ILSMOperationTracker getOperationTracker(int datasetID) {
         synchronized (datasetOpTrackers) {
@@ -400,6 +402,7 @@
         private boolean isExternal;
         private boolean isRegistered;
         private boolean memoryAllocated;
+        private boolean durable;
 
         public DatasetInfo(int datasetID) {
             this.indexes = new HashMap<Long, IndexInfo>();
@@ -480,7 +483,11 @@
         public String toString() {
             return "DatasetID: " + datasetID + ", isOpen: " + isOpen + ", 
refCount: " + referenceCount
                     + ", lastAccess: " + lastAccess + ", isRegistered: " + 
isRegistered + ", memoryAllocated: "
-                    + memoryAllocated;
+                    + memoryAllocated + ", isDurable: " + durable;
+        }
+
+        public boolean isDurable() {
+            return durable;
         }
     }
 
@@ -536,7 +543,7 @@
      * This method can only be called asynchronously safely if we're sure no 
modify operation will take place until the flush is scheduled
      */
     private void flushDatasetOpenIndexes(DatasetInfo dsInfo, boolean 
asyncFlush) throws HyracksDataException {
-        if (!dsInfo.isExternal) {
+        if (!dsInfo.isExternal && dsInfo.durable) {
             synchronized (logRecord) {
                 TransactionUtil.formFlushLogRecord(logRecord, 
dsInfo.datasetID, null, logManager.getNodeId(),
                         dsInfo.indexes.size());
@@ -731,8 +738,10 @@
             List<IVirtualBufferCache> vbcs = new ArrayList<>();
             for (int i = 0; i < storageProperties.getMemoryComponentsNum(); 
i++) {
                 MultitenantVirtualBufferCache vbc = new 
MultitenantVirtualBufferCache(
-                        new VirtualBufferCache(new 
ResourceHeapBufferAllocator(DatasetLifecycleManager.this,
-                                Integer.toString(datasetID)), 
storageProperties.getMemoryComponentPageSize(),
+                        new VirtualBufferCache(
+                                new 
ResourceHeapBufferAllocator(DatasetLifecycleManager.this,
+                                        Integer.toString(datasetID)),
+                                storageProperties.getMemoryComponentPageSize(),
                                 numPages / 
storageProperties.getMemoryComponentsNum() / numPartitions));
                 vbcs.add(vbc);
             }
diff --git 
a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
 
b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index e5a3473..dac05e7 100644
--- 
a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ 
b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -121,13 +121,15 @@
                     }
                 }
             }
-            LogRecord logRecord = new LogRecord();
-            TransactionUtil.formFlushLogRecord(logRecord, datasetID, this, 
logManager.getNodeId(),
-                    dsInfo.getDatasetIndexes().size());
-            try {
-                logManager.log(logRecord);
-            } catch (ACIDException e) {
-                throw new HyracksDataException("could not write flush log", e);
+            if (dsInfo.isDurable()) {
+                LogRecord logRecord = new LogRecord();
+                TransactionUtil.formFlushLogRecord(logRecord, datasetID, this, 
logManager.getNodeId(),
+                        dsInfo.getDatasetIndexes().size());
+                try {
+                    logManager.log(logRecord);
+                } catch (ACIDException e) {
+                    throw new HyracksDataException("could not write flush 
log", e);
+                }
             }
 
             flushLogCreated = true;
diff --git 
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 3a1e729..314d41a 100644
--- 
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -337,14 +337,25 @@
     }
 
     private void createReplicationJob(ReplicationOperation operation, String 
filePath) throws HyracksDataException {
-        filesToBeReplicated.clear();
-        filesToBeReplicated.add(filePath);
-        AsterixReplicationJob job = new 
AsterixReplicationJob(ReplicationJobType.METADATA, operation,
-                ReplicationExecutionType.SYNC, filesToBeReplicated);
-        try {
-            replicationManager.submitJob(job);
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
+        /**
+         * Durable resources path format:
+         * /partiton/dataverse/idx/fileName
+         * Temporary resources path format:
+         * /partiton/TEMP_DATASETS_STORAGE_FOLDER/dataverse/idx/fileName
+         */
+        String[] fileNameTokens = filePath.split(File.separator);
+        String partitionDir = fileNameTokens[fileNameTokens.length - 4];
+        //exclude temporary datasets resources
+        if 
(!partitionDir.equals(StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER)) {
+            filesToBeReplicated.clear();
+            filesToBeReplicated.add(filePath);
+            AsterixReplicationJob job = new 
AsterixReplicationJob(ReplicationJobType.METADATA, operation,
+                    ReplicationExecutionType.SYNC, filesToBeReplicated);
+            try {
+                replicationManager.submitJob(job);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
         }
     }
 

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/778
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I9a52557bf1f3e7632dd826384280abdaa186f672
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>

Reply via email to