abdullah alamoudi has submitted this change and it was merged. Change subject: ASTERIXDB-1642,ASTERIXDB-1657,ASTERIXDB-1658 Fix Task Failure Handling ......................................................................
ASTERIXDB-1642,ASTERIXDB-1657,ASTERIXDB-1658 Fix Task Failure Handling Change-Id: I2ec2c798b704ca426d5937f22e6d2bd394a9095a Reviewed-on: https://asterix-gerrit.ics.uci.edu/1197 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Yingyi Bu <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java A hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java A hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java 8 files changed, 254 insertions(+), 66 deletions(-) Approvals: Yingyi Bu: Looks good to me, approved Jenkins: Verified; No violations found; Verified diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java index b377b1a..ab026eb 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java @@ -626,15 +626,10 @@ /** * Indicates that a single task attempt has encountered a failure. - * - * @param ta - * - Failed Task Attempt - * @param ac - * - Activity Cluster that owns this Task - * @param details - * - Cause of the failure + * @param ta Failed Task Attempt + * @param exceptions exeptions thrown during the failure */ - public void notifyTaskFailure(TaskAttempt ta, ActivityCluster ac, List<Exception> exceptions) { + public void notifyTaskFailure(TaskAttempt ta, List<Exception> exceptions) { try { LOGGER.fine("Received failure notification for TaskAttempt " + ta.getTaskAttemptId()); TaskAttemptId taId = ta.getTaskAttemptId(); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java index 6db3700..8bca4e7 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java @@ -21,7 +21,6 @@ import java.util.List; import org.apache.hyracks.api.dataflow.TaskAttemptId; -import org.apache.hyracks.api.job.ActivityCluster; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.job.JobRun; @@ -40,8 +39,7 @@ protected void performEvent(TaskAttempt ta) { JobRun run = ccs.getActiveRunMap().get(jobId); ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions); - ActivityCluster ac = ta.getTask().getTaskCluster().getActivityCluster(); - run.getScheduler().notifyTaskFailure(ta, ac, exceptions); + run.getScheduler().notifyTaskFailure(ta, exceptions); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java index 0d9ff5d..c91ebd4 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java @@ -165,6 +165,7 @@ this.nodeId = nodeId; } + @Override public String toString() { return super.toString() + "@" + nodeId; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java index 6566655..07e1ad2 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java @@ -55,9 +55,7 @@ if (dpm != null) { ncs.getDatasetPartitionManager().abortReader(jobId); } - - Map<JobId, Joblet> jobletMap = ncs.getJobletMap(); - Joblet ji = jobletMap.get(jobId); + Joblet ji = ncs.getJobletMap().get(jobId); if (ji != null) { Map<TaskAttemptId, Task> taskMap = ji.getTaskMap(); for (TaskAttemptId taId : tasks) { @@ -66,6 +64,9 @@ task.abort(); } } + } else { + LOGGER.log(Level.WARNING, "Joblet couldn't be found. Tasks of job " + jobId + + " have all either completed or failed"); } } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java index 98f2097..ad9481d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java @@ -47,6 +47,7 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.ActivityCluster; import org.apache.hyracks.api.job.ActivityClusterGraph; import org.apache.hyracks.api.job.JobFlag; @@ -55,6 +56,7 @@ import org.apache.hyracks.comm.channels.NetworkInputChannel; import org.apache.hyracks.control.common.deployment.DeploymentUtils; import org.apache.hyracks.control.common.job.TaskAttemptDescriptor; +import org.apache.hyracks.control.common.utils.ExceptionUtils; import org.apache.hyracks.control.common.work.AbstractWork; import org.apache.hyracks.control.nc.Joblet; import org.apache.hyracks.control.nc.NodeControllerService; @@ -97,11 +99,11 @@ @Override public void run() { + Task task = null; try { NCApplicationContext appCtx = ncs.getApplicationContext(); - final Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId, appCtx, acgBytes); + Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId, appCtx, acgBytes); final ActivityClusterGraph acg = joblet.getActivityClusterGraph(); - IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() { @Override public RecordDescriptor getOutputRecordDescriptor(ActivityId aid, int outputIndex) { @@ -117,7 +119,6 @@ return ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId()); } }; - for (TaskAttemptDescriptor td : taskDescriptors) { TaskAttemptId taId = td.getTaskAttemptId(); TaskId tid = taId.getTaskId(); @@ -129,7 +130,7 @@ } final int partition = tid.getPartition(); List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid); - Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutorService(), ncs, + task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutorService(), ncs, createInputChannels(td, inputs)); IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount()); @@ -161,30 +162,32 @@ if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("output: " + i + ": " + conn.getConnectorId()); } - IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition, - td.getPartitionCount(), td.getOutputPartitionCounts()[i]); + IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition, td + .getPartitionCount(), td.getOutputPartitionCounts()[i]); operator.setOutputFrameWriter(i, writer, recordDesc); } } task.setTaskRuntime(collectors.toArray(new IPartitionCollector[collectors.size()]), operator); joblet.addTask(task); - task.start(); } } catch (Exception e) { - e.printStackTrace(); - throw new RuntimeException(e); + LOGGER.log(Level.WARNING, "Failure starting a task", e); + // notify cc of start task failure + List<Exception> exceptions = new ArrayList<>(); + ExceptionUtils.setNodeIds(exceptions, ncs.getId()); + ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, task, exceptions)); } } private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, JobId jobId, INCApplicationContext appCtx, - byte[] acgBytes) throws Exception { + byte[] acgBytes) throws HyracksException { Map<JobId, Joblet> jobletMap = ncs.getJobletMap(); Joblet ji = jobletMap.get(jobId); if (ji == null) { if (acgBytes == null) { - throw new NullPointerException("JobActivityGraph was null"); + throw new HyracksException("Joblet was not found. This job was most likely aborted."); } ActivityClusterGraph acg = (ActivityClusterGraph) DeploymentUtils.deserialize(acgBytes, deploymentId, appCtx); @@ -197,11 +200,11 @@ private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor td, final int partition, Task task, int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, IConnectorPolicy cPolicy) throws HyracksDataException { - IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition, - td.getInputPartitionCounts()[i], td.getPartitionCount()); + IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition, td + .getInputPartitionCounts()[i], td.getPartitionCount()); if (cPolicy.materializeOnReceiveSide()) { - return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector, - task.getTaskAttemptId(), ncs.getExecutorService()); + return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector, task + .getTaskAttemptId(), ncs.getExecutorService()); } else { return collector; } @@ -222,10 +225,12 @@ }; } else { factory = new IPartitionWriterFactory() { + @Override public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException { return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId( - jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutorService()); + jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs + .getExecutorService()); } }; } @@ -233,8 +238,8 @@ factory = new IPartitionWriterFactory() { @Override public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException { - return new PipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(jobId, - conn.getConnectorId(), senderIndex, receiverIndex), taId); + return new PipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(jobId, conn + .getConnectorId(), senderIndex, receiverIndex), taId); } }; } @@ -254,8 +259,8 @@ * @return a list of known channels, one for each connector * @throws UnknownHostException */ - private List<List<PartitionChannel>> createInputChannels(TaskAttemptDescriptor td, List<IConnectorDescriptor> inputs) - throws UnknownHostException { + private List<List<PartitionChannel>> createInputChannels(TaskAttemptDescriptor td, + List<IConnectorDescriptor> inputs) throws UnknownHostException { NetworkAddress[][] inputAddresses = td.getInputPartitionLocations(); List<List<PartitionChannel>> channelsForInputConnectors = new ArrayList<List<PartitionChannel>>(); if (inputAddresses != null) { @@ -266,8 +271,8 @@ NetworkAddress networkAddress = inputAddresses[i][j]; PartitionId pid = new PartitionId(jobId, inputs.get(i).getConnectorId(), j, td .getTaskAttemptId().getTaskId().getPartition()); - PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel( - ncs.getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress + PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs + .getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress .lookupIpAddress()), networkAddress.getPort()), pid, 5)); channels.add(channel); } diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java index 9685837..dc0b6f7 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java @@ -27,12 +27,6 @@ import java.util.logging.Logger; import org.apache.commons.io.FileUtils; -import org.json.JSONArray; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; - import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.comm.IFrameTupleAccessor; @@ -51,6 +45,11 @@ import org.apache.hyracks.control.nc.resources.memory.FrameManager; import org.apache.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; +import org.json.JSONArray; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; public abstract class AbstractMultiNCIntegrationTest { @@ -72,7 +71,6 @@ public AbstractMultiNCIntegrationTest() { outputFiles = new ArrayList<File>(); - ; } @BeforeClass @@ -135,37 +133,38 @@ IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor(); - IHyracksDataset hyracksDataset = new HyracksDataset(hcc, spec.getFrameSize(), nReaders); - IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, spec.getResultSetIds().get(0)); + if (!spec.getResultSetIds().isEmpty()) { + IHyracksDataset hyracksDataset = new HyracksDataset(hcc, spec.getFrameSize(), nReaders); + IHyracksDatasetReader reader = hyracksDataset.createReader(jobId, spec.getResultSetIds().get(0)); - JSONArray resultRecords = new JSONArray(); - ByteBufferInputStream bbis = new ByteBufferInputStream(); + JSONArray resultRecords = new JSONArray(); + ByteBufferInputStream bbis = new ByteBufferInputStream(); - int readSize = reader.read(resultFrame); + int readSize = reader.read(resultFrame); - while (readSize > 0) { + while (readSize > 0) { - try { - frameTupleAccessor.reset(resultFrame.getBuffer()); - for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) { - int start = frameTupleAccessor.getTupleStartOffset(tIndex); - int length = frameTupleAccessor.getTupleEndOffset(tIndex) - start; - bbis.setByteBuffer(resultFrame.getBuffer(), start); - byte[] recordBytes = new byte[length]; - bbis.read(recordBytes, 0, length); - resultRecords.put(new String(recordBytes, 0, length)); - } - } finally { try { - bbis.close(); - } catch (IOException e) { - throw new HyracksDataException(e); + frameTupleAccessor.reset(resultFrame.getBuffer()); + for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) { + int start = frameTupleAccessor.getTupleStartOffset(tIndex); + int length = frameTupleAccessor.getTupleEndOffset(tIndex) - start; + bbis.setByteBuffer(resultFrame.getBuffer(), start); + byte[] recordBytes = new byte[length]; + bbis.read(recordBytes, 0, length); + resultRecords.put(new String(recordBytes, 0, length)); + } + } finally { + try { + bbis.close(); + } catch (IOException e) { + throw new HyracksDataException(e); + } } + + readSize = reader.read(resultFrame); } - - readSize = reader.read(resultFrame); } - hcc.waitForCompletion(jobId); dumpOutputFiles(); } diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java new file mode 100644 index 0000000..6a7a6a7 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.tests.integration; + +import org.apache.hyracks.api.constraints.PartitionConstraintHelper; +import org.apache.hyracks.api.dataflow.IConnectorDescriptor; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; +import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; +import org.apache.hyracks.dataflow.std.misc.SinkOperatorDescriptor; +import org.apache.hyracks.tests.util.ExceptionOnCreatePushRuntimeOperatorDescriptor; +import org.junit.Assert; +import org.junit.Test; + +public class JobFailureTest extends AbstractMultiNCIntegrationTest { + + @Test + public void failureOnCreatePushRuntime() throws Exception { + JobSpecification spec = new JobSpecification(); + AbstractSingleActivityOperatorDescriptor sourceOpDesc = new ExceptionOnCreatePushRuntimeOperatorDescriptor(spec, + 0, 1, new int[] { 4 }, true); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sourceOpDesc, ASTERIX_IDS); + SinkOperatorDescriptor sinkOpDesc = new SinkOperatorDescriptor(spec, 1); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sinkOpDesc, ASTERIX_IDS); + IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec); + spec.connect(conn, sourceOpDesc, 0, sinkOpDesc, 0); + spec.addRoot(sinkOpDesc); + try { + runTest(spec); + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + Assert.assertTrue(ExceptionOnCreatePushRuntimeOperatorDescriptor.succeed()); + // should also check the content of the different ncs + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java new file mode 100644 index 0000000..8c5bf48 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.tests.util; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; + +public class ExceptionOnCreatePushRuntimeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { + private static final long serialVersionUID = 1L; + private static AtomicInteger createPushRuntime = new AtomicInteger(); + private static AtomicInteger initializeCounter = new AtomicInteger(); + private static AtomicInteger openCloseCounter = new AtomicInteger(); + private final int[] exceptionPartitions; + private final boolean sleepOnInitialize; + + public ExceptionOnCreatePushRuntimeOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity, + int outputArity, int[] exceptionPartitions, boolean sleepOnInitialize) { + super(spec, inputArity, outputArity); + this.exceptionPartitions = exceptionPartitions; + this.sleepOnInitialize = sleepOnInitialize; + } + + @Override + public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { + createPushRuntime.incrementAndGet(); + try { + if (exceptionPartitions != null) { + for (int p : exceptionPartitions) { + if (p == partition) { + throw new HyracksDataException("I throw exceptions"); + } + } + } + return new IOperatorNodePushable() { + @Override + public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) + throws HyracksDataException { + } + + @Override + public void initialize() throws HyracksDataException { + initializeCounter.incrementAndGet(); + if (sleepOnInitialize) { + try { + synchronized (this) { + wait(); + } + } catch (InterruptedException e) { + // can safely interrupt thread since this is a task thread + Thread.currentThread().interrupt(); + throw new HyracksDataException(e); + } + } + } + + @Override + public IFrameWriter getInputFrameWriter(int index) { + return new IFrameWriter() { + @Override + public void open() throws HyracksDataException { + openCloseCounter.incrementAndGet(); + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + } + + @Override + public void fail() throws HyracksDataException { + } + + @Override + public void close() throws HyracksDataException { + openCloseCounter.decrementAndGet(); + } + }; + } + + @Override + public int getInputArity() { + return inputArity; + } + + @Override + public String getDisplayName() { + return ExceptionOnCreatePushRuntimeOperatorDescriptor.class.getSimpleName() + + ".OperatorNodePushable:" + partition; + } + + @Override + public void deinitialize() throws HyracksDataException { + initializeCounter.decrementAndGet(); + } + }; + } finally { + createPushRuntime.decrementAndGet(); + } + } + + public static boolean succeed() { + boolean success = openCloseCounter.get() == 0 && createPushRuntime.get() == 0 && initializeCounter.get() == 0; + if (!success) { + System.err.println("Failure:"); + System.err.println("CreatePushRuntime:" + createPushRuntime.get()); + System.err.println("InitializeCounter:" + initializeCounter.get()); + System.err.println("OpenCloseCounter:" + openCloseCounter.get()); + } + return success; + } +} -- To view, visit https://asterix-gerrit.ics.uci.edu/1197 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I2ec2c798b704ca426d5937f22e6d2bd394a9095a Gerrit-PatchSet: 10 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
