abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2762
Change subject: [NO ISSUE][ING] Add timeout to stop active message
......................................................................
[NO ISSUE][ING] Add timeout to stop active message
Change-Id: Ie0416d76670e945cb958f5a1c235201a3e016009
---
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
A
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeContent.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
9 files changed, 100 insertions(+), 29 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/62/2762/1
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index 2aebc59..b44dfea 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -33,6 +33,7 @@
import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.active.message.ActiveStatsRequestMessage;
import org.apache.asterix.active.message.ActiveStatsResponse;
+import org.apache.asterix.active.message.StopRuntimeContent;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.memory.ConcurrentFramePool;
@@ -139,7 +140,7 @@
shutdown = true;
runtimes.forEach((runtimeId, runtime) -> stopFutures.put(runtimeId,
executor.submit(() -> {
// we may already have been stopped- only stop once
- stopIfRunning(runtimeId, runtime);
+ stopIfRunning(runtime, SHUTDOWN_TIMEOUT_SECS, TimeUnit.SECONDS);
return null;
})));
stopFutures.entrySet().parallelStream().forEach(entry -> {
@@ -158,7 +159,8 @@
}
private void stopRuntime(ActiveManagerMessage message) {
- ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
+ StopRuntimeContent content = (StopRuntimeContent) message.getPayload();
+ ActiveRuntimeId runtimeId = content.getRuntimeId();
IActiveRuntime runtime = runtimes.get(runtimeId);
if (runtime == null) {
LOGGER.warn("Request to stop runtime: " + runtimeId
@@ -167,21 +169,21 @@
} else {
executor.execute(() -> {
try {
- stopIfRunning(runtimeId, runtime);
- } catch (Exception e) {
+ stopIfRunning(runtime, content.getTimeout(),
content.getUnit());
+ } catch (Throwable e) {
// TODO(till) Figure out a better way to handle failure to
stop a runtime
- LOGGER.log(Level.WARN, "Failed to stop runtime: " +
runtimeId, e);
+ LOGGER.warn("Failed to stop runtime: {}", runtimeId, e);
}
});
}
}
- private void stopIfRunning(ActiveRuntimeId runtimeId, IActiveRuntime
runtime)
+ private void stopIfRunning(IActiveRuntime runtime, long timeout, TimeUnit
unit)
throws HyracksDataException, InterruptedException {
- if (runtimes.containsKey(runtimeId)) {
- runtime.stop();
+ if (runtimes.containsKey(runtime.getRuntimeId())) {
+ runtime.stop(timeout, unit);
} else {
- LOGGER.info("Not stopping already stopped runtime " + runtimeId);
+ LOGGER.info("Not stopping already stopped runtime {}",
runtime.getRuntimeId());
}
}
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index 2f30df4..c3444ba 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.active;
+import java.util.concurrent.TimeUnit;
+
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.active.message.ActivePartitionMessage.Event;
import org.apache.asterix.common.api.INcApplicationContext;
@@ -62,10 +64,10 @@
protected abstract void start() throws HyracksDataException,
InterruptedException;
@Override
- public final void stop() throws HyracksDataException, InterruptedException
{
+ public final void stop(long timeout, TimeUnit unit) throws
HyracksDataException, InterruptedException {
synchronized (this) {
if (!done) {
- abort();
+ abort(timeout, unit);
}
while (!done) {
wait();
@@ -75,11 +77,14 @@
/**
* called from a different thread. This method stops the active node and
force the start() call to return
+ *
+ * @param unit
+ * @param timeout
*
* @throws HyracksDataException
* @throws InterruptedException
*/
- protected abstract void abort() throws HyracksDataException,
InterruptedException;
+ protected abstract void abort(long timeout, TimeUnit unit) throws
HyracksDataException, InterruptedException;
@Override
public String toString() {
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
index f37b2e8..2da7193 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.active;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
@@ -31,10 +33,15 @@
/**
* Stops the running activity
*
+ * @param timeout
+ * time for graceful stop. interrupt the runtime after that
+ * @param unit
+ * unit of the timeout
+ *
* @throws HyracksDataException
* @throws InterruptedException
*/
- void stop() throws HyracksDataException, InterruptedException;
+ void stop(long timeout, TimeUnit unit) throws HyracksDataException,
InterruptedException;
/**
* @return the job id associated with this active runtime
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeContent.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeContent.java
new file mode 100644
index 0000000..891ce25
--- /dev/null
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StopRuntimeContent.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active.message;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.active.ActiveRuntimeId;
+
+public class StopRuntimeContent implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private final ActiveRuntimeId runtimeId;
+ private final long timeout;
+ private final TimeUnit unit;
+
+ public StopRuntimeContent(ActiveRuntimeId runtimeId, long timeout,
TimeUnit unit) {
+ this.runtimeId = runtimeId;
+ this.timeout = timeout;
+ this.unit = unit;
+ }
+
+ public ActiveRuntimeId getRuntimeId() {
+ return runtimeId;
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+ public TimeUnit getUnit() {
+ return unit;
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 66d3e81..e795830 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -40,6 +40,7 @@
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.active.message.ActivePartitionMessage.Event;
import org.apache.asterix.active.message.ActiveStatsRequestMessage;
+import org.apache.asterix.active.message.StopRuntimeContent;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
@@ -73,7 +74,9 @@
private static final EnumSet<ActivityState> TRANSITION_STATES =
EnumSet.of(ActivityState.RESUMING,
ActivityState.STARTING, ActivityState.STOPPING,
ActivityState.RECOVERING, ActivityState.CANCELLING);
private static final String DEFAULT_ACTIVE_STATS = "{\"Stats\":\"N/A\"}";
- // finals
+ protected static final long STOP_MESSAGE_TIMEOUT = 2L;
+ protected static final long SUSPEND_MESSAGE_TIMEOUT = 5L;
+ protected static final TimeUnit TIMEOUT_UNIT = TimeUnit.MINUTES;
protected final IClusterStateManager clusterStateManager;
protected final ActiveNotificationHandler handler;
protected final List<IActiveEntityEventSubscriber> subscribers = new
ArrayList<>();
@@ -431,7 +434,8 @@
protected abstract JobId compileAndStartJob(MetadataProvider
metadataProvider) throws HyracksDataException;
@SuppressWarnings("squid:S1181")
- protected synchronized void doStop(MetadataProvider metadataProvider)
throws HyracksDataException {
+ protected synchronized void doStop(MetadataProvider metadataProvider, long
timeout, TimeUnit unit)
+ throws HyracksDataException {
ActivityState intention = state;
Set<ActivityState> waitFor;
if (intention == ActivityState.STOPPING) {
@@ -444,7 +448,7 @@
WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this,
waitFor);
// Note: once we start sending stop messages, we can't go back until
the entity is stopped
try {
- sendStopMessages(metadataProvider);
+ sendStopMessages(metadataProvider, timeout, unit);
LOGGER.log(Level.DEBUG, "Waiting for its state to become " +
waitFor);
subscriber.sync();
LOGGER.log(Level.DEBUG, "Disconnect has been completed " +
waitFor);
@@ -465,7 +469,7 @@
LOGGER.warn("Failure encountered while stopping {}", this, e);
}
- protected void sendStopMessages(MetadataProvider metadataProvider) throws
Exception {
+ protected void sendStopMessages(MetadataProvider metadataProvider, long
timeout, TimeUnit unit) throws Exception {
ICcApplicationContext applicationCtx =
metadataProvider.getApplicationContext();
ICCMessageBroker messageBroker = (ICCMessageBroker)
applicationCtx.getServiceContext().getMessageBroker();
AlgebricksAbsolutePartitionConstraint runtimeLocations =
getLocations();
@@ -473,9 +477,9 @@
LOGGER.log(Level.INFO, "Sending stop messages to " + runtimeLocations);
for (String location : runtimeLocations.getLocations()) {
LOGGER.log(Level.INFO, "Sending to " + location);
- messageBroker.sendApplicationMessageToNC(
- new
ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY,
getActiveRuntimeId(partition++)),
- location);
+ ActiveRuntimeId runtimeId = getActiveRuntimeId(partition++);
+ messageBroker.sendApplicationMessageToNC(new
ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY,
+ new StopRuntimeContent(runtimeId, timeout, unit)),
location);
}
}
@@ -510,7 +514,7 @@
} else if (state == ActivityState.RUNNING) {
setState(ActivityState.STOPPING);
try {
- doStop(metadataProvider);
+ doStop(metadataProvider, STOP_MESSAGE_TIMEOUT, TIMEOUT_UNIT);
} catch (Exception e) {
setState(ActivityState.STOPPED);
LOGGER.log(Level.ERROR, "Failed to stop the entity " +
entityId, e);
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
index 47fc46f..88f1332 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
@@ -20,6 +20,7 @@
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
@@ -62,7 +63,7 @@
}
@Override
- protected void doStop(MetadataProvider metadataProvider) throws
HyracksDataException {
+ protected void doStop(MetadataProvider metadataProvider, long timeout,
TimeUnit unit) throws HyracksDataException {
IActiveEntityEventSubscriber eventSubscriber =
new WaitForStateSubscriber(this,
Collections.singleton(ActivityState.STOPPED));
try {
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
index c771a94..80806f3 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
@@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.ActivityState;
@@ -172,7 +173,7 @@
}
@Override
- protected void sendStopMessages(MetadataProvider metadataProvider) throws
Exception {
+ protected void sendStopMessages(MetadataProvider metadataProvider, long
timeout, TimeUnit unit) throws Exception {
step(onStop);
failCompile(onStop);
if (onStop == Behavior.RUNNING_JOB_FAIL) {
@@ -214,7 +215,7 @@
@Override
protected void doSuspend(MetadataProvider metadataProvider) throws
HyracksDataException {
- doStop(metadataProvider);
+ doStop(metadataProvider, SUSPEND_MESSAGE_TIMEOUT, TIMEOUT_UNIT);
}
@Override
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 867fb60..98f75df 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.external.operators;
+import java.util.concurrent.TimeUnit;
+
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
import org.apache.asterix.active.EntityId;
@@ -44,8 +46,6 @@
*/
public class FeedIntakeOperatorNodePushable extends
ActiveSourceOperatorNodePushable {
private static final Logger LOGGER = LogManager.getLogger();
- // TODO: Make configurable
https://issues.apache.org/jira/browse/ASTERIXDB-2065
- public static final int DEFAULT_ABORT_TIMEOUT = 60000;
private final FeedIntakeOperatorDescriptor opDesc;
private final FeedAdapter adapter;
private boolean poisoned = false;
@@ -125,12 +125,12 @@
}
@Override
- protected void abort() throws HyracksDataException, InterruptedException {
+ protected void abort(long timeout, TimeUnit unit) throws
HyracksDataException, InterruptedException {
LOGGER.info(runtimeId + " aborting...");
synchronized (this) {
poisoned = true;
try {
- if (!adapter.stop(DEFAULT_ABORT_TIMEOUT)) {
+ if (!adapter.stop(unit.toMillis(timeout))) {
LOGGER.info(runtimeId + " failed to stop adapter.
interrupting the thread...");
taskThread.interrupt();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
index c5a6de2..a16f4ef 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
@@ -65,7 +65,8 @@
writer.close();
} catch (Throwable th) { // NOSONAR Will be suppressed
try {
- LOGGER.log(Level.WARN, "Failure closing a closeable
resource", th);
+ LOGGER.log(Level.WARN, "Failure closing a closeable
resource of class {}",
+ writer.getClass().getName(), th);
} catch (Throwable loggingFailure) { // NOSONAR: Ignore
catching Throwable
// NOSONAR: Ignore logging failure
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/2762
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie0416d76670e945cb958f5a1c235201a3e016009
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>