abdullah alamoudi has submitted this change and it was merged. Change subject: [NO ISSUE][ING] Add timeout to stop active message ......................................................................
[NO ISSUE][ING] Add timeout to stop active message Change-Id: Ie0416d76670e945cb958f5a1c235201a3e016009 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2762 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java A asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java M hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java 10 files changed, 106 insertions(+), 29 deletions(-) Approvals: Anon. E. Moose #1000171: Till Westmann: Looks good to me, approved Jenkins: Verified; No violations found; ; Verified diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java index 2aebc59..6373d6c 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java @@ -33,6 +33,7 @@ import org.apache.asterix.active.message.ActiveManagerMessage; import org.apache.asterix.active.message.ActiveStatsRequestMessage; import org.apache.asterix.active.message.ActiveStatsResponse; +import org.apache.asterix.active.message.StopRuntimeParameters; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.memory.ConcurrentFramePool; @@ -40,6 +41,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.util.JavaSerializationUtils; import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.util.ExitUtil; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -139,7 +141,7 @@ shutdown = true; runtimes.forEach((runtimeId, runtime) -> stopFutures.put(runtimeId, executor.submit(() -> { // we may already have been stopped- only stop once - stopIfRunning(runtimeId, runtime); + stopIfRunning(runtime, SHUTDOWN_TIMEOUT_SECS, TimeUnit.SECONDS); return null; }))); stopFutures.entrySet().parallelStream().forEach(entry -> { @@ -157,8 +159,10 @@ LOGGER.warn("Shutdown ActiveManager on node " + nodeId + " complete"); } + @SuppressWarnings("squid:S1181") // Catch Error private void stopRuntime(ActiveManagerMessage message) { - ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload(); + StopRuntimeParameters content = (StopRuntimeParameters) message.getPayload(); + ActiveRuntimeId runtimeId = content.getRuntimeId(); IActiveRuntime runtime = runtimes.get(runtimeId); if (runtime == null) { LOGGER.warn("Request to stop runtime: " + runtimeId @@ -167,21 +171,23 @@ } else { executor.execute(() -> { try { - stopIfRunning(runtimeId, runtime); + stopIfRunning(runtime, content.getTimeout(), content.getUnit()); } catch (Exception e) { - // TODO(till) Figure out a better way to handle failure to stop a runtime - LOGGER.log(Level.WARN, "Failed to stop runtime: " + runtimeId, e); + LOGGER.warn("Failed to stop runtime: {}", runtimeId, e); + } catch (Throwable th) { + LOGGER.warn("Failed to stop runtime: {}", runtimeId, th); + ExitUtil.halt(ExitUtil.EC_UNCAUGHT_THROWABLE); } }); } } - private void stopIfRunning(ActiveRuntimeId runtimeId, IActiveRuntime runtime) + private void stopIfRunning(IActiveRuntime runtime, long timeout, TimeUnit unit) throws HyracksDataException, InterruptedException { - if (runtimes.containsKey(runtimeId)) { - runtime.stop(); + if (runtimes.containsKey(runtime.getRuntimeId())) { + runtime.stop(timeout, unit); } else { - LOGGER.info("Not stopping already stopped runtime " + runtimeId); + LOGGER.info("Not stopping already stopped runtime {}", runtime.getRuntimeId()); } } diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java index 2f30df4..af9d109 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.active; +import java.util.concurrent.TimeUnit; + import org.apache.asterix.active.message.ActivePartitionMessage; import org.apache.asterix.active.message.ActivePartitionMessage.Event; import org.apache.asterix.common.api.INcApplicationContext; @@ -62,10 +64,10 @@ protected abstract void start() throws HyracksDataException, InterruptedException; @Override - public final void stop() throws HyracksDataException, InterruptedException { + public final void stop(long timeout, TimeUnit unit) throws HyracksDataException, InterruptedException { synchronized (this) { if (!done) { - abort(); + abort(timeout, unit); } while (!done) { wait(); @@ -76,10 +78,13 @@ /** * called from a different thread. This method stops the active node and force the start() call to return * + * @param unit + * @param timeout + * * @throws HyracksDataException * @throws InterruptedException */ - protected abstract void abort() throws HyracksDataException, InterruptedException; + protected abstract void abort(long timeout, TimeUnit unit) throws HyracksDataException, InterruptedException; @Override public String toString() { diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java index f37b2e8..2da7193 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.active; +import java.util.concurrent.TimeUnit; + import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; @@ -31,10 +33,15 @@ /** * Stops the running activity * + * @param timeout + * time for graceful stop. interrupt the runtime after that + * @param unit + * unit of the timeout + * * @throws HyracksDataException * @throws InterruptedException */ - void stop() throws HyracksDataException, InterruptedException; + void stop(long timeout, TimeUnit unit) throws HyracksDataException, InterruptedException; /** * @return the job id associated with this active runtime diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java new file mode 100644 index 0000000..fbc41a1 --- /dev/null +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeParameters.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.active.message; + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +import org.apache.asterix.active.ActiveRuntimeId; + +public class StopRuntimeParameters implements Serializable { + + private static final long serialVersionUID = 1L; + private final ActiveRuntimeId runtimeId; + private final long timeout; + private final TimeUnit unit; + + public StopRuntimeParameters(ActiveRuntimeId runtimeId, long timeout, TimeUnit unit) { + this.runtimeId = runtimeId; + this.timeout = timeout; + this.unit = unit; + } + + public ActiveRuntimeId getRuntimeId() { + return runtimeId; + } + + public long getTimeout() { + return timeout; + } + + public TimeUnit getUnit() { + return unit; + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index 66d3e81..be43a4e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -40,6 +40,7 @@ import org.apache.asterix.active.message.ActivePartitionMessage; import org.apache.asterix.active.message.ActivePartitionMessage.Event; import org.apache.asterix.active.message.ActiveStatsRequestMessage; +import org.apache.asterix.active.message.StopRuntimeParameters; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.ErrorCode; @@ -73,7 +74,10 @@ private static final EnumSet<ActivityState> TRANSITION_STATES = EnumSet.of(ActivityState.RESUMING, ActivityState.STARTING, ActivityState.STOPPING, ActivityState.RECOVERING, ActivityState.CANCELLING); private static final String DEFAULT_ACTIVE_STATS = "{\"Stats\":\"N/A\"}"; - // finals + // TODO: Make configurable https://issues.apache.org/jira/browse/ASTERIXDB-2065 + protected static final long STOP_MESSAGE_TIMEOUT = 2L; + protected static final long SUSPEND_MESSAGE_TIMEOUT = 5L; + protected static final TimeUnit TIMEOUT_UNIT = TimeUnit.MINUTES; protected final IClusterStateManager clusterStateManager; protected final ActiveNotificationHandler handler; protected final List<IActiveEntityEventSubscriber> subscribers = new ArrayList<>(); @@ -431,7 +435,8 @@ protected abstract JobId compileAndStartJob(MetadataProvider metadataProvider) throws HyracksDataException; @SuppressWarnings("squid:S1181") - protected synchronized void doStop(MetadataProvider metadataProvider) throws HyracksDataException { + protected synchronized void doStop(MetadataProvider metadataProvider, long timeout, TimeUnit unit) + throws HyracksDataException { ActivityState intention = state; Set<ActivityState> waitFor; if (intention == ActivityState.STOPPING) { @@ -444,7 +449,7 @@ WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this, waitFor); // Note: once we start sending stop messages, we can't go back until the entity is stopped try { - sendStopMessages(metadataProvider); + sendStopMessages(metadataProvider, timeout, unit); LOGGER.log(Level.DEBUG, "Waiting for its state to become " + waitFor); subscriber.sync(); LOGGER.log(Level.DEBUG, "Disconnect has been completed " + waitFor); @@ -465,7 +470,7 @@ LOGGER.warn("Failure encountered while stopping {}", this, e); } - protected void sendStopMessages(MetadataProvider metadataProvider) throws Exception { + protected void sendStopMessages(MetadataProvider metadataProvider, long timeout, TimeUnit unit) throws Exception { ICcApplicationContext applicationCtx = metadataProvider.getApplicationContext(); ICCMessageBroker messageBroker = (ICCMessageBroker) applicationCtx.getServiceContext().getMessageBroker(); AlgebricksAbsolutePartitionConstraint runtimeLocations = getLocations(); @@ -473,9 +478,9 @@ LOGGER.log(Level.INFO, "Sending stop messages to " + runtimeLocations); for (String location : runtimeLocations.getLocations()) { LOGGER.log(Level.INFO, "Sending to " + location); - messageBroker.sendApplicationMessageToNC( - new ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY, getActiveRuntimeId(partition++)), - location); + ActiveRuntimeId runtimeId = getActiveRuntimeId(partition++); + messageBroker.sendApplicationMessageToNC(new ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY, + new StopRuntimeParameters(runtimeId, timeout, unit)), location); } } @@ -510,7 +515,7 @@ } else if (state == ActivityState.RUNNING) { setState(ActivityState.STOPPING); try { - doStop(metadataProvider); + doStop(metadataProvider, STOP_MESSAGE_TIMEOUT, TIMEOUT_UNIT); } catch (Exception e) { setState(ActivityState.STOPPED); LOGGER.log(Level.ERROR, "Failed to stop the entity " + entityId, e); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java index 47fc46f..88f1332 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.asterix.active.ActivityState; import org.apache.asterix.active.EntityId; @@ -62,7 +63,7 @@ } @Override - protected void doStop(MetadataProvider metadataProvider) throws HyracksDataException { + protected void doStop(MetadataProvider metadataProvider, long timeout, TimeUnit unit) throws HyracksDataException { IActiveEntityEventSubscriber eventSubscriber = new WaitForStateSubscriber(this, Collections.singleton(ActivityState.STOPPED)); try { diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java index c771a94..80806f3 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import org.apache.asterix.active.ActiveRuntimeId; import org.apache.asterix.active.ActivityState; @@ -172,7 +173,7 @@ } @Override - protected void sendStopMessages(MetadataProvider metadataProvider) throws Exception { + protected void sendStopMessages(MetadataProvider metadataProvider, long timeout, TimeUnit unit) throws Exception { step(onStop); failCompile(onStop); if (onStop == Behavior.RUNNING_JOB_FAIL) { @@ -214,7 +215,7 @@ @Override protected void doSuspend(MetadataProvider metadataProvider) throws HyracksDataException { - doStop(metadataProvider); + doStop(metadataProvider, SUSPEND_MESSAGE_TIMEOUT, TIMEOUT_UNIT); } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java index 867fb60..98f75df 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.external.operators; +import java.util.concurrent.TimeUnit; + import org.apache.asterix.active.ActiveRuntimeId; import org.apache.asterix.active.ActiveSourceOperatorNodePushable; import org.apache.asterix.active.EntityId; @@ -44,8 +46,6 @@ */ public class FeedIntakeOperatorNodePushable extends ActiveSourceOperatorNodePushable { private static final Logger LOGGER = LogManager.getLogger(); - // TODO: Make configurable https://issues.apache.org/jira/browse/ASTERIXDB-2065 - public static final int DEFAULT_ABORT_TIMEOUT = 60000; private final FeedIntakeOperatorDescriptor opDesc; private final FeedAdapter adapter; private boolean poisoned = false; @@ -125,12 +125,12 @@ } @Override - protected void abort() throws HyracksDataException, InterruptedException { + protected void abort(long timeout, TimeUnit unit) throws HyracksDataException, InterruptedException { LOGGER.info(runtimeId + " aborting..."); synchronized (this) { poisoned = true; try { - if (!adapter.stop(DEFAULT_ABORT_TIMEOUT)) { + if (!adapter.stop(unit.toMillis(timeout))) { LOGGER.info(runtimeId + " failed to stop adapter. interrupting the thread..."); taskThread.interrupt(); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java index c5a6de2..ea86298 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java @@ -65,7 +65,8 @@ writer.close(); } catch (Throwable th) { // NOSONAR Will be suppressed try { - LOGGER.log(Level.WARN, "Failure closing a closeable resource", th); + LOGGER.log(Level.WARN, "Failure closing a closeable resource of class {}", + writer.getClass().getSimpleName(), th); } catch (Throwable loggingFailure) { // NOSONAR: Ignore catching Throwable // NOSONAR: Ignore logging failure } diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java index 79d3fac..14cfc59 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java @@ -38,6 +38,7 @@ public static final int EC_FAILED_TO_COMMIT_METADATA_TXN = 6; public static final int EC_FAILED_TO_ABORT_METADATA_TXN = 7; public static final int EC_INCONSISTENT_METADATA = 8; + public static final int EC_UNCAUGHT_THROWABLE = 9; public static final int EC_UNHANDLED_EXCEPTION = 11; public static final int EC_FAILED_TO_CANCEL_ACTIVE_START_STOP = 22; public static final int EC_IMMEDIATE_HALT = 33; -- To view, visit https://asterix-gerrit.ics.uci.edu/2762 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ie0416d76670e945cb958f5a1c235201a3e016009 Gerrit-PatchSet: 4 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
