abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2762

Change subject: [NO ISSUE][ING] Add timeout to stop active message
......................................................................

[NO ISSUE][ING] Add timeout to stop active message

Change-Id: Ie0416d76670e945cb958f5a1c235201a3e016009
---
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
A 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeContent.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
9 files changed, 100 insertions(+), 29 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/62/2762/1

diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index 2aebc59..b44dfea 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -33,6 +33,7 @@
 import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.active.message.ActiveStatsRequestMessage;
 import org.apache.asterix.active.message.ActiveStatsResponse;
+import org.apache.asterix.active.message.StopRuntimeContent;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.memory.ConcurrentFramePool;
@@ -139,7 +140,7 @@
         shutdown = true;
         runtimes.forEach((runtimeId, runtime) -> stopFutures.put(runtimeId, 
executor.submit(() -> {
             // we may already have been stopped- only stop once
-            stopIfRunning(runtimeId, runtime);
+            stopIfRunning(runtime, SHUTDOWN_TIMEOUT_SECS, TimeUnit.SECONDS);
             return null;
         })));
         stopFutures.entrySet().parallelStream().forEach(entry -> {
@@ -158,7 +159,8 @@
     }
 
     private void stopRuntime(ActiveManagerMessage message) {
-        ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
+        StopRuntimeContent content = (StopRuntimeContent) message.getPayload();
+        ActiveRuntimeId runtimeId = content.getRuntimeId();
         IActiveRuntime runtime = runtimes.get(runtimeId);
         if (runtime == null) {
             LOGGER.warn("Request to stop runtime: " + runtimeId
@@ -167,21 +169,21 @@
         } else {
             executor.execute(() -> {
                 try {
-                    stopIfRunning(runtimeId, runtime);
-                } catch (Exception e) {
+                    stopIfRunning(runtime, content.getTimeout(), 
content.getUnit());
+                } catch (Throwable e) {
                     // TODO(till) Figure out a better way to handle failure to 
stop a runtime
-                    LOGGER.log(Level.WARN, "Failed to stop runtime: " + 
runtimeId, e);
+                    LOGGER.warn("Failed to stop runtime: {}", runtimeId, e);
                 }
             });
         }
     }
 
-    private void stopIfRunning(ActiveRuntimeId runtimeId, IActiveRuntime 
runtime)
+    private void stopIfRunning(IActiveRuntime runtime, long timeout, TimeUnit 
unit)
             throws HyracksDataException, InterruptedException {
-        if (runtimes.containsKey(runtimeId)) {
-            runtime.stop();
+        if (runtimes.containsKey(runtime.getRuntimeId())) {
+            runtime.stop(timeout, unit);
         } else {
-            LOGGER.info("Not stopping already stopped runtime " + runtimeId);
+            LOGGER.info("Not stopping already stopped runtime {}", 
runtime.getRuntimeId());
         }
     }
 
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index 2f30df4..c3444ba 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.active;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.active.message.ActivePartitionMessage.Event;
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -62,10 +64,10 @@
     protected abstract void start() throws HyracksDataException, 
InterruptedException;
 
     @Override
-    public final void stop() throws HyracksDataException, InterruptedException 
{
+    public final void stop(long timeout, TimeUnit unit) throws 
HyracksDataException, InterruptedException {
         synchronized (this) {
             if (!done) {
-                abort();
+                abort(timeout, unit);
             }
             while (!done) {
                 wait();
@@ -75,11 +77,14 @@
 
     /**
      * called from a different thread. This method stops the active node and 
force the start() call to return
+     * 
+     * @param unit
+     * @param timeout
      *
      * @throws HyracksDataException
      * @throws InterruptedException
      */
-    protected abstract void abort() throws HyracksDataException, 
InterruptedException;
+    protected abstract void abort(long timeout, TimeUnit unit) throws 
HyracksDataException, InterruptedException;
 
     @Override
     public String toString() {
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
index f37b2e8..2da7193 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.active;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 
@@ -31,10 +33,15 @@
     /**
      * Stops the running activity
      *
+     * @param timeout
+     *            time for graceful stop. interrupt the runtime after that
+     * @param unit
+     *            unit of the timeout
+     *
      * @throws HyracksDataException
      * @throws InterruptedException
      */
-    void stop() throws HyracksDataException, InterruptedException;
+    void stop(long timeout, TimeUnit unit) throws HyracksDataException, 
InterruptedException;
 
     /**
      * @return the job id associated with this active runtime
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeContent.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeContent.java
new file mode 100644
index 0000000..891ce25
--- /dev/null
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeContent.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active.message;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.active.ActiveRuntimeId;
+
+public class StopRuntimeContent implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private final ActiveRuntimeId runtimeId;
+    private final long timeout;
+    private final TimeUnit unit;
+
+    public StopRuntimeContent(ActiveRuntimeId runtimeId, long timeout, 
TimeUnit unit) {
+        this.runtimeId = runtimeId;
+        this.timeout = timeout;
+        this.unit = unit;
+    }
+
+    public ActiveRuntimeId getRuntimeId() {
+        return runtimeId;
+    }
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    public TimeUnit getUnit() {
+        return unit;
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 66d3e81..e795830 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -40,6 +40,7 @@
 import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.active.message.ActivePartitionMessage.Event;
 import org.apache.asterix.active.message.ActiveStatsRequestMessage;
+import org.apache.asterix.active.message.StopRuntimeContent;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -73,7 +74,9 @@
     private static final EnumSet<ActivityState> TRANSITION_STATES = 
EnumSet.of(ActivityState.RESUMING,
             ActivityState.STARTING, ActivityState.STOPPING, 
ActivityState.RECOVERING, ActivityState.CANCELLING);
     private static final String DEFAULT_ACTIVE_STATS = "{\"Stats\":\"N/A\"}";
-    // finals
+    protected static final long STOP_MESSAGE_TIMEOUT = 2L;
+    protected static final long SUSPEND_MESSAGE_TIMEOUT = 5L;
+    protected static final TimeUnit TIMEOUT_UNIT = TimeUnit.MINUTES;
     protected final IClusterStateManager clusterStateManager;
     protected final ActiveNotificationHandler handler;
     protected final List<IActiveEntityEventSubscriber> subscribers = new 
ArrayList<>();
@@ -431,7 +434,8 @@
     protected abstract JobId compileAndStartJob(MetadataProvider 
metadataProvider) throws HyracksDataException;
 
     @SuppressWarnings("squid:S1181")
-    protected synchronized void doStop(MetadataProvider metadataProvider) 
throws HyracksDataException {
+    protected synchronized void doStop(MetadataProvider metadataProvider, long 
timeout, TimeUnit unit)
+            throws HyracksDataException {
         ActivityState intention = state;
         Set<ActivityState> waitFor;
         if (intention == ActivityState.STOPPING) {
@@ -444,7 +448,7 @@
         WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this, 
waitFor);
         // Note: once we start sending stop messages, we can't go back until 
the entity is stopped
         try {
-            sendStopMessages(metadataProvider);
+            sendStopMessages(metadataProvider, timeout, unit);
             LOGGER.log(Level.DEBUG, "Waiting for its state to become " + 
waitFor);
             subscriber.sync();
             LOGGER.log(Level.DEBUG, "Disconnect has been completed " + 
waitFor);
@@ -465,7 +469,7 @@
         LOGGER.warn("Failure encountered while stopping {}", this, e);
     }
 
-    protected void sendStopMessages(MetadataProvider metadataProvider) throws 
Exception {
+    protected void sendStopMessages(MetadataProvider metadataProvider, long 
timeout, TimeUnit unit) throws Exception {
         ICcApplicationContext applicationCtx = 
metadataProvider.getApplicationContext();
         ICCMessageBroker messageBroker = (ICCMessageBroker) 
applicationCtx.getServiceContext().getMessageBroker();
         AlgebricksAbsolutePartitionConstraint runtimeLocations = 
getLocations();
@@ -473,9 +477,9 @@
         LOGGER.log(Level.INFO, "Sending stop messages to " + runtimeLocations);
         for (String location : runtimeLocations.getLocations()) {
             LOGGER.log(Level.INFO, "Sending to " + location);
-            messageBroker.sendApplicationMessageToNC(
-                    new 
ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY, 
getActiveRuntimeId(partition++)),
-                    location);
+            ActiveRuntimeId runtimeId = getActiveRuntimeId(partition++);
+            messageBroker.sendApplicationMessageToNC(new 
ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY,
+                    new StopRuntimeContent(runtimeId, timeout, unit)), 
location);
         }
     }
 
@@ -510,7 +514,7 @@
         } else if (state == ActivityState.RUNNING) {
             setState(ActivityState.STOPPING);
             try {
-                doStop(metadataProvider);
+                doStop(metadataProvider, STOP_MESSAGE_TIMEOUT, TIMEOUT_UNIT);
             } catch (Exception e) {
                 setState(ActivityState.STOPPED);
                 LOGGER.log(Level.ERROR, "Failed to stop the entity " + 
entityId, e);
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
index 47fc46f..88f1332 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
@@ -20,6 +20,7 @@
 
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
@@ -62,7 +63,7 @@
     }
 
     @Override
-    protected void doStop(MetadataProvider metadataProvider) throws 
HyracksDataException {
+    protected void doStop(MetadataProvider metadataProvider, long timeout, 
TimeUnit unit) throws HyracksDataException {
         IActiveEntityEventSubscriber eventSubscriber =
                 new WaitForStateSubscriber(this, 
Collections.singleton(ActivityState.STOPPED));
         try {
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
index c771a94..80806f3 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
@@ -21,6 +21,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.ActivityState;
@@ -172,7 +173,7 @@
     }
 
     @Override
-    protected void sendStopMessages(MetadataProvider metadataProvider) throws 
Exception {
+    protected void sendStopMessages(MetadataProvider metadataProvider, long 
timeout, TimeUnit unit) throws Exception {
         step(onStop);
         failCompile(onStop);
         if (onStop == Behavior.RUNNING_JOB_FAIL) {
@@ -214,7 +215,7 @@
 
     @Override
     protected void doSuspend(MetadataProvider metadataProvider) throws 
HyracksDataException {
-        doStop(metadataProvider);
+        doStop(metadataProvider, SUSPEND_MESSAGE_TIMEOUT, TIMEOUT_UNIT);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 867fb60..98f75df 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.external.operators;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
 import org.apache.asterix.active.EntityId;
@@ -44,8 +46,6 @@
  */
 public class FeedIntakeOperatorNodePushable extends 
ActiveSourceOperatorNodePushable {
     private static final Logger LOGGER = LogManager.getLogger();
-    // TODO: Make configurable 
https://issues.apache.org/jira/browse/ASTERIXDB-2065
-    public static final int DEFAULT_ABORT_TIMEOUT = 60000;
     private final FeedIntakeOperatorDescriptor opDesc;
     private final FeedAdapter adapter;
     private boolean poisoned = false;
@@ -125,12 +125,12 @@
     }
 
     @Override
-    protected void abort() throws HyracksDataException, InterruptedException {
+    protected void abort(long timeout, TimeUnit unit) throws 
HyracksDataException, InterruptedException {
         LOGGER.info(runtimeId + " aborting...");
         synchronized (this) {
             poisoned = true;
             try {
-                if (!adapter.stop(DEFAULT_ABORT_TIMEOUT)) {
+                if (!adapter.stop(unit.toMillis(timeout))) {
                     LOGGER.info(runtimeId + " failed to stop adapter. 
interrupting the thread...");
                     taskThread.interrupt();
                 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
index c5a6de2..a16f4ef 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
@@ -65,7 +65,8 @@
                 writer.close();
             } catch (Throwable th) { // NOSONAR Will be suppressed
                 try {
-                    LOGGER.log(Level.WARN, "Failure closing a closeable 
resource", th);
+                    LOGGER.log(Level.WARN, "Failure closing a closeable 
resource of class {}",
+                            writer.getClass().getName(), th);
                 } catch (Throwable loggingFailure) { // NOSONAR: Ignore 
catching Throwable
                     // NOSONAR: Ignore logging failure
                 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2762
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie0416d76670e945cb958f5a1c235201a3e016009
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>

Reply via email to