Michael Blow has submitted this change and it was merged.

Change subject: [NO ISSUE][ING] Halt on active suspend or resume failures
......................................................................


[NO ISSUE][ING] Halt on active suspend or resume failures

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Failures during active handler suspend or resume leaves the
  system in an inconsistent state.
- When that happens, we halt and rely on the recovery to go
  back to a consistent state.

Change-Id: I00d31f704f2fa22a5e14c711b6771345ca7d000a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2889
Reviewed-by: Michael Blow <mb...@apache.org>
Tested-by: Michael Blow <mb...@apache.org>
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
M 
hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
2 files changed, 41 insertions(+), 28 deletions(-)

Approvals:
  Michael Blow: Looks good to me, approved; Verified



diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 5faa980..6eba4ea 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -44,6 +44,7 @@
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.util.SingleThreadEventProcessor;
+import org.apache.hyracks.util.ExitUtil;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -259,41 +260,51 @@
             LOGGER.log(level, "Suspending active events handler");
             suspended = true;
         }
-        IMetadataLockManager lockManager = 
mdProvider.getApplicationContext().getMetadataLockManager();
-        Collection<IActiveEntityEventsListener> registeredListeners = 
entityEventListeners.values();
-        for (IActiveEntityEventsListener listener : registeredListeners) {
-            // write lock the listener
-            // exclusive lock all the datasets
-            String dataverseName = listener.getEntityId().getDataverse();
-            String entityName = listener.getEntityId().getEntityName();
-            if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "Suspending " + listener.getEntityId());
+        try {
+            IMetadataLockManager lockManager = 
mdProvider.getApplicationContext().getMetadataLockManager();
+            Collection<IActiveEntityEventsListener> registeredListeners = 
entityEventListeners.values();
+            for (IActiveEntityEventsListener listener : registeredListeners) {
+                // write lock the listener
+                // exclusive lock all the datasets
+                String dataverseName = listener.getEntityId().getDataverse();
+                String entityName = listener.getEntityId().getEntityName();
+                if (LOGGER.isEnabled(level)) {
+                    LOGGER.log(level, "Suspending " + listener.getEntityId());
+                }
+                LOGGER.log(level, "Acquiring locks");
+                
lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName + 
'.' + entityName);
+                List<Dataset> datasets = ((ActiveEntityEventsListener) 
listener).getDatasets();
+                for (Dataset dataset : datasets) {
+                    
lockManager.acquireDatasetExclusiveModificationLock(mdProvider.getLocks(),
+                            DatasetUtil.getFullyQualifiedName(dataset));
+                }
+                LOGGER.log(level, "locks acquired");
+                ((ActiveEntityEventsListener) listener).suspend(mdProvider);
+                if (LOGGER.isEnabled(level)) {
+                    LOGGER.log(level, listener.getEntityId() + " suspended");
+                }
             }
-            LOGGER.log(level, "Acquiring locks");
-            lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), 
dataverseName + '.' + entityName);
-            List<Dataset> datasets = ((ActiveEntityEventsListener) 
listener).getDatasets();
-            for (Dataset dataset : datasets) {
-                
lockManager.acquireDatasetExclusiveModificationLock(mdProvider.getLocks(),
-                        DatasetUtil.getFullyQualifiedName(dataset));
-            }
-            LOGGER.log(level, "locks acquired");
-            ((ActiveEntityEventsListener) listener).suspend(mdProvider);
-            if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, listener.getEntityId() + " suspended");
-            }
+        } catch (Throwable th) {
+            LOGGER.error("Suspend active failed", th);
+            ExitUtil.halt(ExitUtil.EC_ACTIVE_SUSPEND_FAILURE);
         }
     }
 
     public void resume(MetadataProvider mdProvider) throws 
HyracksDataException {
         LOGGER.log(level, "Resuming active events handler");
-        for (IActiveEntityEventsListener listener : 
entityEventListeners.values()) {
-            if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "Resuming " + listener.getEntityId());
+        try {
+            for (IActiveEntityEventsListener listener : 
entityEventListeners.values()) {
+                if (LOGGER.isEnabled(level)) {
+                    LOGGER.log(level, "Resuming " + listener.getEntityId());
+                }
+                ((ActiveEntityEventsListener) listener).resume(mdProvider);
+                if (LOGGER.isEnabled(level)) {
+                    LOGGER.log(level, listener.getEntityId() + " resumed");
+                }
             }
-            ((ActiveEntityEventsListener) listener).resume(mdProvider);
-            if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, listener.getEntityId() + " resumed");
-            }
+        } catch (Throwable th) {
+            LOGGER.error("Resume active failed", th);
+            ExitUtil.halt(ExitUtil.EC_ACTIVE_RESUME_FAILURE);
         }
         synchronized (this) {
             suspended = false;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index 9604c30..f9d9b1b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -48,6 +48,8 @@
     public static final int EC_TXN_LOG_FLUSHER_FAILURE = 14;
     public static final int EC_NODE_REGISTRATION_FAILURE = 15;
     public static final int EC_NETWORK_FAILURE = 16;
+    public static final int EC_ACTIVE_SUSPEND_FAILURE = 17;
+    public static final int EC_ACTIVE_RESUME_FAILURE = 18;
     public static final int EC_FAILED_TO_CANCEL_ACTIVE_START_STOP = 22;
     public static final int EC_IMMEDIATE_HALT = 33;
     public static final int EC_HALT_ABNORMAL_RESERVED_44 = 44;

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2889
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I00d31f704f2fa22a5e14c711b6771345ca7d000a
Gerrit-PatchSet: 5
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mb...@apache.org>

Reply via email to