Till Westmann has submitted this change and it was merged. Change subject: Fix potential hanging in op.initialize(). ......................................................................
Fix potential hanging in op.initialize(). - Let initialize()/deinitialize() always pair up. Change-Id: I701b271bc6dc78e67274fa845dec013756843a70 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1243 Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java 4 files changed, 57 insertions(+), 22 deletions(-) Approvals: Till Westmann: Looks good to me, approved Jenkins: Verified; Verified Objections: Jenkins: Violations found diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java index 4ee8303..3e557a2 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java @@ -20,12 +20,13 @@ package org.apache.hyracks.api.rewriter.runtime; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Queue; +import java.util.Map.Entry; import java.util.concurrent.Callable; import java.util.concurrent.Future; @@ -56,6 +57,7 @@ private final int partition; private final int nPartitions; private int inputArity = 0; + private boolean[] startedInitialization; public SuperActivityOperatorNodePushable(SuperActivity parent, Map<ActivityId, IActivity> startActivities, IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) { @@ -79,8 +81,11 @@ @Override public void initialize() throws HyracksDataException { - // Initializes all OperatorNodePushables in parallel. - runInParallel(op -> op.initialize()); + // Initializes all OperatorNodePushables in parallel and then finally deinitializes them. + runInParallel((op, index) -> { + startedInitialization[index] = true; + op.initialize(); + }); } public void init() throws HyracksDataException { @@ -150,12 +155,19 @@ } } } + + // Sets the startedInitialization flags to be false. + startedInitialization = new boolean[operatorNodePushablesBFSOrder.size()]; + Arrays.fill(startedInitialization, false); } @Override public void deinitialize() throws HyracksDataException { - // De-initialize all OperatorNodePushables in parallel. - runInParallel(op -> op.deinitialize()); + runInParallel((op, index) -> { + if (startedInitialization[index]) { + op.deinitialize(); + } + }); } @Override @@ -191,18 +203,21 @@ } interface OperatorNodePushableAction { - public void runAction(IOperatorNodePushable op) throws HyracksDataException; + void runAction(IOperatorNodePushable op, int opIndex) throws HyracksDataException; } - private void runInParallel(OperatorNodePushableAction opAction) throws HyracksDataException { - List<Future<Void>> initializationTasks = new ArrayList<Future<Void>>(); + private void runInParallel(OperatorNodePushableAction opAction) + throws HyracksDataException { + List<Future<Void>> initializationTasks = new ArrayList<>(); try { + int index = 0; // Run one action for all OperatorNodePushables in parallel through a thread pool. for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) { + final int opIndex = index++; initializationTasks.add(ctx.getExecutorService().submit(new Callable<Void>() { @Override public Void call() throws Exception { - opAction.runAction(op); + opAction.runAction(op, opIndex); return null; } })); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java index f463bfa..7c626c1 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java @@ -235,8 +235,12 @@ } } - private synchronized void addPendingThread(Thread t) { + private synchronized boolean addPendingThread(Thread t) { + if (aborted) { + return false; + } pendingThreads.add(t); + return true; } private synchronized void removePendingThread(Thread t) { @@ -256,9 +260,16 @@ public void run() { Thread ct = Thread.currentThread(); String threadName = ct.getName(); - addPendingThread(ct); + ct.setName(displayName + ":" + taskAttemptId + ":" + 0); + // Calls synchronized addPendingThread(..) to make sure that in the abort() method, + // the thread is not escaped from interruption. + if (!addPendingThread(ct)) { + exceptions.add(new InterruptedException("Task " + getTaskAttemptId() + " was aborted!")); + ExceptionUtils.setNodeIds(exceptions, ncs.getId()); + ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, exceptions)); + return; + } try { - ct.setName(displayName + ":" + taskAttemptId + ":" + 0); try { operator.initialize(); if (collectors.length > 0) { @@ -271,11 +282,12 @@ executorService.execute(new Runnable() { @Override public void run() { - if (aborted) { + Thread thread = Thread.currentThread(); + // Calls synchronized addPendingThread(..) to make sure that in the abort() method, + // the thread is not escaped from interruption. + if (!addPendingThread(thread)) { return; } - Thread thread = Thread.currentThread(); - addPendingThread(thread); String oldName = thread.getName(); thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx); thread.setPriority(Thread.MIN_PRIORITY); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java index 452da88..b76e458 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java @@ -26,13 +26,18 @@ import org.apache.hyracks.dataflow.std.misc.SinkOperatorDescriptor; import org.apache.hyracks.tests.util.ExceptionOnCreatePushRuntimeOperatorDescriptor; import org.junit.Assert; +import org.junit.Test; public class JobFailureTest extends AbstractMultiNCIntegrationTest { - // commenting out due to intermittent hangs: - // https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/2877/artifact/target/threaddumps/jstack_28541.html - // @Test + @Test public void failureOnCreatePushRuntime() throws Exception { + for (int round = 0; round < 1000; ++round) { + execTest(); + } + } + + private void execTest() throws Exception { JobSpecification spec = new JobSpecification(); AbstractSingleActivityOperatorDescriptor sourceOpDesc = new ExceptionOnCreatePushRuntimeOperatorDescriptor(spec, 0, 1, new int[] { 4 }, true); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java index 8c5bf48..14c644a 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java @@ -20,6 +20,8 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -32,6 +34,7 @@ public class ExceptionOnCreatePushRuntimeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { private static final long serialVersionUID = 1L; + private static Logger LOGGER = Logger.getLogger(ExceptionOnCreatePushRuntimeOperatorDescriptor.class.getName()); private static AtomicInteger createPushRuntime = new AtomicInteger(); private static AtomicInteger initializeCounter = new AtomicInteger(); private static AtomicInteger openCloseCounter = new AtomicInteger(); @@ -126,10 +129,10 @@ public static boolean succeed() { boolean success = openCloseCounter.get() == 0 && createPushRuntime.get() == 0 && initializeCounter.get() == 0; if (!success) { - System.err.println("Failure:"); - System.err.println("CreatePushRuntime:" + createPushRuntime.get()); - System.err.println("InitializeCounter:" + initializeCounter.get()); - System.err.println("OpenCloseCounter:" + openCloseCounter.get()); + LOGGER.log(Level.SEVERE, "Failure:"); + LOGGER.log(Level.SEVERE, "CreatePushRuntime:" + createPushRuntime.get()); + LOGGER.log(Level.SEVERE, "InitializeCounter:" + initializeCounter.get()); + LOGGER.log(Level.SEVERE, "OpenCloseCounter:" + openCloseCounter.get()); } return success; } -- To view, visit https://asterix-gerrit.ics.uci.edu/1243 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I701b271bc6dc78e67274fa845dec013756843a70 Gerrit-PatchSet: 8 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Yingyi Bu <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
