Michael Blow has submitted this change and it was merged. Change subject: Run Active Shutdown On Separate Thread ......................................................................
Run Active Shutdown On Separate Thread - As feed shutdown can be slow, do it on another thread to not tie up worker. - use nc thread executor for feed adapter thread - error handling Change-Id: I8fd9bc454b290420682160364ac78e4b91a9abc3 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1223 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java 5 files changed, 37 insertions(+), 40 deletions(-) Approvals: Till Westmann: Looks good to me, approved Jenkins: Verified; No violations found; Verified diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java index bd6dae9..b15cfca 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import org.apache.asterix.active.message.ActiveManagerMessage; import org.apache.asterix.common.memory.ConcurrentFramePool; @@ -29,11 +30,14 @@ public class ActiveManager { private static final Logger LOGGER = Logger.getLogger(ActiveManager.class.getName()); + private final Executor executor; private final Map<ActiveRuntimeId, IActiveRuntime> runtimes; private final ConcurrentFramePool activeFramePool; private final String nodeId; - public ActiveManager(String nodeId, long activeMemoryBudget, int frameSize) throws HyracksDataException { + public ActiveManager(Executor executor, String nodeId, long activeMemoryBudget, int frameSize) + throws HyracksDataException { + this.executor = executor; this.nodeId = nodeId; this.activeFramePool = new ConcurrentFramePool(nodeId, activeMemoryBudget, frameSize); this.runtimes = new ConcurrentHashMap<>(); @@ -69,7 +73,7 @@ stopRuntime(message); break; default: - LOGGER.warn("Unknown message type received"); + LOGGER.warn("Unknown message type received: " + message.getKind()); } } @@ -79,12 +83,14 @@ if (runtime == null) { LOGGER.warn("Request to stop a runtime that is not registered " + runtimeId); } else { - try { - runtime.stop(); - } catch (HyracksDataException | InterruptedException e) { - // TODO(till) Figure out a better way to handle failure to stop a runtime - LOGGER.warn("Failed to stop runtime: " + runtimeId, e); - } + executor.execute(() -> { + try { + runtime.stop(); + } catch (Exception e) { + // TODO(till) Figure out a better way to handle failure to stop a runtime + LOGGER.warn("Failed to stop runtime: " + runtimeId, e); + } + }); } } } diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java index 1cda298..7f25896 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java @@ -55,15 +55,10 @@ @Override public final void stop() throws HyracksDataException, InterruptedException { - try { - abort(); - } finally { - if (!done) { - synchronized (this) { - while (!done) { - wait(); - } - } + abort(); + synchronized (this) { + while (!done) { + wait(); } } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java index 343fdb3..ed081b5 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java @@ -216,7 +216,7 @@ isShuttingdown = false; - activeManager = new ActiveManager(ncApplicationContext.getNodeId(), + activeManager = new ActiveManager(threadExecutor, ncApplicationContext.getNodeId(), feedProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize()); if (ClusterProperties.INSTANCE.isReplicationEnabled()) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java index 424f2dc..7f5372b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java @@ -18,13 +18,12 @@ */ package org.apache.asterix.external.feed.runtime; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Future; import org.apache.asterix.active.EntityId; import org.apache.asterix.external.dataset.adapter.FeedAdapter; import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.log4j.Logger; /** @@ -42,45 +41,42 @@ private final int partition; // The partition number - private final ExecutorService executorService; // Executor service to run/shutdown the adapter executor + private final IHyracksTaskContext ctx; private IngestionRuntime ingestionRuntime; // Runtime representing the ingestion stage of a feed + + private Future<?> execution; private volatile boolean done = false; private volatile boolean failed = false; - public AdapterRuntimeManager(EntityId entityId, FeedAdapter feedAdapter, IFrameWriter writer, int partition) { + public AdapterRuntimeManager(IHyracksTaskContext ctx, EntityId entityId, FeedAdapter feedAdapter, + IFrameWriter writer, int partition) { + this.ctx = ctx; this.feedId = entityId; this.feedAdapter = feedAdapter; this.partition = partition; this.adapterExecutor = new AdapterExecutor(writer, feedAdapter, this); - this.executorService = Executors.newSingleThreadExecutor(); } public void start() { - executorService.execute(adapterExecutor); + execution = ctx.getExecutorService().submit(adapterExecutor); } public void stop() throws InterruptedException { - boolean stopped = false; try { - stopped = feedAdapter.stop(); - } catch (Exception exception) { - LOGGER.error("Unable to stop adapter " + feedAdapter, exception); - } finally { - if (stopped) { + if (feedAdapter.stop()) { // stop() returned true, we wait for the process termination - executorService.shutdown(); - try { - executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOGGER.error("Interrupted while waiting for feed adapter to finish its work", e); - throw e; - } + execution.get(); } else { // stop() returned false, we try to force shutdown - executorService.shutdownNow(); + execution.cancel(true); } + } catch (InterruptedException e) { + LOGGER.error("Interrupted while waiting for feed adapter to finish its work", e); + throw e; + } catch (Exception exception) { + LOGGER.error("Unable to stop adapter " + feedAdapter, exception); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java index 7c8fe14..afe87c0 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java @@ -74,7 +74,7 @@ // create the distributor frameDistributor = new DistributeFeedFrameWriter(feedId, writer, FeedRuntimeType.INTAKE, partition); // create adapter runtime manager - adapterRuntimeManager = new AdapterRuntimeManager(feedId, adapter, frameDistributor, partition); + adapterRuntimeManager = new AdapterRuntimeManager(ctx, feedId, adapter, frameDistributor, partition); // create and register the runtime ActiveRuntimeId runtimeId = new ActiveRuntimeId(feedId, FeedRuntimeType.INTAKE.toString(), partition); ingestionRuntime = new IngestionRuntime(feedId, runtimeId, frameDistributor, adapterRuntimeManager, ctx); -- To view, visit https://asterix-gerrit.ics.uci.edu/1223 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I8fd9bc454b290420682160364ac78e4b91a9abc3 Gerrit-PatchSet: 2 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Steven Jacobs <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
