abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1197
Change subject: Fix Failure Handling on Tasks Start
......................................................................
Fix Failure Handling on Tasks Start
Change-Id: I2ec2c798b704ca426d5937f22e6d2bd394a9095a
---
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
A
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartJobWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
A
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
A
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
13 files changed, 523 insertions(+), 199 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/97/1197/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
index a92e700..d7936e0 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
@@ -81,8 +81,8 @@
private static class ClassLoaderObjectInputStream extends
ObjectInputStream {
private ClassLoader classLoader;
- protected ClassLoaderObjectInputStream(InputStream in, ClassLoader
classLoader)
- throws IOException, SecurityException {
+ protected ClassLoaderObjectInputStream(InputStream in, ClassLoader
classLoader) throws IOException,
+ SecurityException {
super(in);
this.classLoader = classLoader;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
index b377b1a..1541190 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/JobScheduler.java
@@ -31,9 +31,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.json.JSONException;
-import org.json.JSONObject;
-
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.constraints.Constraint;
import
org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression;
@@ -66,6 +63,8 @@
import org.apache.hyracks.control.cc.work.JobCleanupWork;
import org.apache.hyracks.control.common.job.PartitionState;
import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
+import org.json.JSONException;
+import org.json.JSONObject;
public class JobScheduler {
private static final Logger LOGGER =
Logger.getLogger(JobScheduler.class.getName());
@@ -160,7 +159,7 @@
}
private void startRunnableActivityClusters() throws HyracksException {
- Set<TaskCluster> taskClusterRoots = new HashSet<TaskCluster>();
+ Set<TaskCluster> taskClusterRoots = new HashSet<>();
findRunnableTaskClusterRoots(taskClusterRoots,
jobRun.getActivityClusterGraph().getActivityClusterMap()
.values());
if (LOGGER.isLoggable(Level.FINE)) {
@@ -312,11 +311,11 @@
for (int i = 0; i < tasks.length; ++i) {
Task ts = tasks[i];
TaskId tid = ts.getTaskId();
- TaskAttempt taskAttempt = new TaskAttempt(tcAttempt, new
TaskAttemptId(new TaskId(tid.getActivityId(),
- tid.getPartition()), attempts), ts);
+ TaskAttempt taskAttempt = new TaskAttempt(tcAttempt, new
TaskAttemptId(new TaskId(tid.getActivityId(), tid
+ .getPartition()), attempts), ts);
taskAttempt.setStatus(TaskAttempt.TaskStatus.INITIALIZED, null);
- locationMap.put(tid,
- new
PartitionLocationExpression(tid.getActivityId().getOperatorDescriptorId(),
tid.getPartition()));
+ locationMap.put(tid, new
PartitionLocationExpression(tid.getActivityId().getOperatorDescriptorId(), tid
+ .getPartition()));
taskAttempts.put(tid, taskAttempt);
}
tcAttempt.setTaskAttempts(taskAttempts);
@@ -337,8 +336,8 @@
OperatorDescriptorId opId =
tid.getActivityId().getOperatorDescriptorId();
jobRun.registerOperatorLocation(opId, tid.getPartition(), nodeId);
ActivityPartitionDetails apd =
ts.getActivityPlan().getActivityPartitionDetails();
- TaskAttemptDescriptor tad = new
TaskAttemptDescriptor(taskAttempt.getTaskAttemptId(),
- apd.getPartitionCount(), apd.getInputPartitionCounts(),
apd.getOutputPartitionCounts());
+ TaskAttemptDescriptor tad = new
TaskAttemptDescriptor(taskAttempt.getTaskAttemptId(), apd
+ .getPartitionCount(), apd.getInputPartitionCounts(),
apd.getOutputPartitionCounts());
tads.add(tad);
}
tcAttempt.initializePendingTaskCounter();
@@ -409,8 +408,8 @@
Object location = solver.getValue(pLocationExpr);
if (location == null) {
// pick any
- nodeId = liveNodes.toArray(new
String[liveNodes.size()])[Math.abs(new Random().nextInt())
- % liveNodes.size()];
+ nodeId = liveNodes.toArray(new
String[liveNodes.size()])[Math.abs(new Random().nextInt()) % liveNodes
+ .size()];
} else if (location instanceof String) {
nodeId = (String) location;
} else if (location instanceof String[]) {
@@ -462,8 +461,8 @@
final DeploymentId deploymentId = jobRun.getDeploymentId();
final JobId jobId = jobRun.getJobId();
final ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
- final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies =
new HashMap<ConnectorDescriptorId, IConnectorPolicy>(
- jobRun.getConnectorPolicyMap());
+ final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies =
+ new HashMap<ConnectorDescriptorId,
IConnectorPolicy>(jobRun.getConnectorPolicyMap());
try {
byte[] acgBytes = JavaSerializationUtils.serialize(acg);
for (Map.Entry<String, List<TaskAttemptDescriptor>> entry :
taskAttemptMap.entrySet()) {
@@ -476,9 +475,11 @@
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Starting: " + taskDescriptors + " at " +
entry.getKey());
}
- byte[] jagBytes = changed ? acgBytes : null;
- node.getNodeController().startTasks(deploymentId, jobId,
jagBytes, taskDescriptors,
- connectorPolicies, jobRun.getFlags());
+ if (changed) {
+ // create job once
+ node.getNodeController().startJob(deploymentId, jobId,
acgBytes);
+ }
+ node.getNodeController().startTasks(jobId,
taskDescriptors, connectorPolicies, jobRun.getFlags());
}
}
} catch (Exception e) {
@@ -588,8 +589,8 @@
ConnectorDescriptorId cdId = pid.getConnectorDescriptorId();
IConnectorPolicy cPolicy = connectorPolicyMap.get(cdId);
PartitionState maxState = pmm.getMaximumAvailableState(pid);
- if (maxState == null
- || (cPolicy.consumerWaitsForProducerToFinish() && maxState
!= PartitionState.COMMITTED)) {
+ if (maxState == null || (cPolicy.consumerWaitsForProducerToFinish()
+ && maxState != PartitionState.COMMITTED)) {
if
(findDoomedTaskClusters(partitionProducingTaskClusterMap.get(pid),
doomedTaskClusters)) {
doomed = true;
}
@@ -620,7 +621,8 @@
LOGGER.warning("Spurious task complete notification: " + taId
+ " Current state = " + taStatus);
}
} else {
- LOGGER.warning("Ignoring task complete notification: " + taId + "
-- Current last attempt = " + lastAttempt);
+ LOGGER.warning("Ignoring task complete notification: " + taId + "
-- Current last attempt = "
+ + lastAttempt);
}
}
@@ -680,17 +682,17 @@
if (taskClusters != null) {
for (TaskCluster tc : taskClusters) {
TaskClusterAttempt lastTaskClusterAttempt =
findLastTaskClusterAttempt(tc);
- if (lastTaskClusterAttempt != null
- && (lastTaskClusterAttempt.getStatus() ==
TaskClusterAttempt.TaskClusterStatus.COMPLETED || lastTaskClusterAttempt
+ if (lastTaskClusterAttempt != null &&
(lastTaskClusterAttempt
+ .getStatus() ==
TaskClusterAttempt.TaskClusterStatus.COMPLETED
+ || lastTaskClusterAttempt
.getStatus() ==
TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
boolean abort = false;
for (TaskAttempt ta :
lastTaskClusterAttempt.getTaskAttempts().values()) {
- assert (ta.getStatus() ==
TaskAttempt.TaskStatus.COMPLETED || ta.getStatus() ==
TaskAttempt.TaskStatus.RUNNING);
+ assert (ta.getStatus() ==
TaskAttempt.TaskStatus.COMPLETED || ta
+ .getStatus() ==
TaskAttempt.TaskStatus.RUNNING);
if (deadNodes.contains(ta.getNodeId())) {
- ta.setStatus(
- TaskAttempt.TaskStatus.FAILED,
- Collections.singletonList(new
Exception("Node " + ta.getNodeId()
- + " failed")));
+
ta.setStatus(TaskAttempt.TaskStatus.FAILED, Collections.singletonList(
+ new Exception("Node " +
ta.getNodeId() + " failed")));
ta.setEndTime(System.currentTimeMillis());
abort = true;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index dff5827..f8b220b 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -35,9 +35,8 @@
import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
public interface INodeController {
- public void startTasks(DeploymentId deploymentId, JobId jobId, byte[]
planBytes,
- List<TaskAttemptDescriptor> taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
- EnumSet<JobFlag> flags) throws Exception;
+ public void startTasks(JobId jobId, List<TaskAttemptDescriptor>
taskDescriptors,
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
EnumSet<JobFlag> flags) throws Exception;
public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws
Exception;
@@ -56,4 +55,6 @@
public void sendApplicationMessageToNC(byte[] data, DeploymentId
deploymentId, String nodeId) throws Exception;
public void takeThreadDump(String requestId) throws Exception;
+
+ public void startJob(DeploymentId deploymentId, JobId jobId, byte[]
planBytes) throws Exception;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index aa9a4fe..96cc068 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -66,7 +66,6 @@
public class CCNCFunctions {
private static final Logger LOGGER =
Logger.getLogger(CCNCFunctions.class.getName());
-
private static final int FID_CODE_SIZE = 1;
public enum FunctionId {
@@ -84,6 +83,7 @@
REPORT_RESULT_PARTITION_FAILURE,
NODE_REGISTRATION_RESULT,
+ START_JOB,
START_TASKS,
ABORT_TASKS,
CLEANUP_JOBLET,
@@ -670,20 +670,14 @@
public static class StartTasksFunction extends Function {
private static final long serialVersionUID = 1L;
-
- private final DeploymentId deploymentId;
private final JobId jobId;
- private final byte[] planBytes;
private final List<TaskAttemptDescriptor> taskDescriptors;
private final Map<ConnectorDescriptorId, IConnectorPolicy>
connectorPolicies;
private final EnumSet<JobFlag> flags;
- public StartTasksFunction(DeploymentId deploymentId, JobId jobId,
byte[] planBytes,
- List<TaskAttemptDescriptor> taskDescriptors,
+ public StartTasksFunction(JobId jobId, List<TaskAttemptDescriptor>
taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy>
connectorPolicies, EnumSet<JobFlag> flags) {
- this.deploymentId = deploymentId;
this.jobId = jobId;
- this.planBytes = planBytes;
this.taskDescriptors = taskDescriptors;
this.connectorPolicies = connectorPolicies;
this.flags = flags;
@@ -694,16 +688,8 @@
return FunctionId.START_TASKS;
}
- public DeploymentId getDeploymentId() {
- return deploymentId;
- }
-
public JobId getJobId() {
return jobId;
- }
-
- public byte[] getPlanBytes() {
- return planBytes;
}
public List<TaskAttemptDescriptor> getTaskDescriptors() {
@@ -716,6 +702,98 @@
public EnumSet<JobFlag> getFlags() {
return flags;
+ }
+
+ public static Object deserialize(ByteBuffer buffer, int length) throws
Exception {
+ ByteArrayInputStream bais = new
ByteArrayInputStream(buffer.array(), buffer.position(), length);
+ DataInputStream dis = new DataInputStream(bais);
+
+ //read jobId and taskId
+ JobId jobId = JobId.create(dis);
+
+ // read task attempt descriptors
+ int tadSize = dis.readInt();
+ List<TaskAttemptDescriptor> taskDescriptors = new
ArrayList<TaskAttemptDescriptor>();
+ for (int i = 0; i < tadSize; i++) {
+ TaskAttemptDescriptor tad = TaskAttemptDescriptor.create(dis);
+ taskDescriptors.add(tad);
+ }
+
+ //read connector policies
+ int cpSize = dis.readInt();
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies =
+ new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
+ for (int i = 0; i < cpSize; i++) {
+ ConnectorDescriptorId cid = ConnectorDescriptorId.create(dis);
+ IConnectorPolicy policy =
ConnectorPolicyFactory.INSTANCE.getConnectorPolicy(dis);
+ connectorPolicies.put(cid, policy);
+ }
+
+ // read flags
+ int flagSize = dis.readInt();
+ EnumSet<JobFlag> flags = EnumSet.noneOf(JobFlag.class);
+ for (int i = 0; i < flagSize; i++) {
+ flags.add(JobFlag.values()[(dis.readInt())]);
+ }
+ return new StartTasksFunction(jobId, taskDescriptors,
connectorPolicies, flags);
+ }
+
+ public static void serialize(OutputStream out, Object object) throws
Exception {
+ StartTasksFunction fn = (StartTasksFunction) object;
+ DataOutputStream dos = new DataOutputStream(out);
+
+ //write jobId
+ fn.jobId.writeFields(dos);
+
+ //write task descriptors
+ dos.writeInt(fn.taskDescriptors.size());
+ for (int i = 0; i < fn.taskDescriptors.size(); i++) {
+ fn.taskDescriptors.get(i).writeFields(dos);
+ }
+
+ //write connector policies
+ dos.writeInt(fn.connectorPolicies.size());
+ for (Entry<ConnectorDescriptorId, IConnectorPolicy> entry :
fn.connectorPolicies.entrySet()) {
+ entry.getKey().writeFields(dos);
+
ConnectorPolicyFactory.INSTANCE.writeConnectorPolicy(entry.getValue(), dos);
+ }
+
+ //write flags
+ dos.writeInt(fn.flags.size());
+ for (JobFlag flag : fn.flags) {
+ dos.writeInt(flag.ordinal());
+ }
+ }
+ }
+
+ public static class StartJobFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final DeploymentId deploymentId;
+ private final JobId jobId;
+ private final byte[] planBytes;
+
+ public StartJobFunction(DeploymentId deploymentId, JobId jobId, byte[]
planBytes) {
+ this.deploymentId = deploymentId;
+ this.jobId = jobId;
+ this.planBytes = planBytes;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.START_JOB;
+ }
+
+ public DeploymentId getDeploymentId() {
+ return deploymentId;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public byte[] getPlanBytes() {
+ return planBytes;
}
public static Object deserialize(ByteBuffer buffer, int length) throws
Exception {
@@ -737,36 +815,11 @@
planBytes = new byte[planBytesSize];
dis.read(planBytes, 0, planBytesSize);
}
-
- // read task attempt descriptors
- int tadSize = dis.readInt();
- List<TaskAttemptDescriptor> taskDescriptors = new
ArrayList<TaskAttemptDescriptor>();
- for (int i = 0; i < tadSize; i++) {
- TaskAttemptDescriptor tad = TaskAttemptDescriptor.create(dis);
- taskDescriptors.add(tad);
- }
-
- //read connector policies
- int cpSize = dis.readInt();
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies =
new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
- for (int i = 0; i < cpSize; i++) {
- ConnectorDescriptorId cid = ConnectorDescriptorId.create(dis);
- IConnectorPolicy policy =
ConnectorPolicyFactory.INSTANCE.getConnectorPolicy(dis);
- connectorPolicies.put(cid, policy);
- }
-
- // read flags
- int flagSize = dis.readInt();
- EnumSet<JobFlag> flags = EnumSet.noneOf(JobFlag.class);
- for (int i = 0; i < flagSize; i++) {
- flags.add(JobFlag.values()[(dis.readInt())]);
- }
-
- return new StartTasksFunction(deploymentId, jobId, planBytes,
taskDescriptors, connectorPolicies, flags);
+ return new StartJobFunction(deploymentId, jobId, planBytes);
}
public static void serialize(OutputStream out, Object object) throws
Exception {
- StartTasksFunction fn = (StartTasksFunction) object;
+ StartJobFunction fn = (StartJobFunction) object;
DataOutputStream dos = new DataOutputStream(out);
//write jobId and deploymentId
@@ -780,25 +833,6 @@
dos.writeInt(fn.planBytes == null ? -1 : fn.planBytes.length);
if (fn.planBytes != null) {
dos.write(fn.planBytes, 0, fn.planBytes.length);
- }
-
- //write task descriptors
- dos.writeInt(fn.taskDescriptors.size());
- for (int i = 0; i < fn.taskDescriptors.size(); i++) {
- fn.taskDescriptors.get(i).writeFields(dos);
- }
-
- //write connector policies
- dos.writeInt(fn.connectorPolicies.size());
- for (Entry<ConnectorDescriptorId, IConnectorPolicy> entry :
fn.connectorPolicies.entrySet()) {
- entry.getKey().writeFields(dos);
-
ConnectorPolicyFactory.INSTANCE.writeConnectorPolicy(entry.getValue(), dos);
- }
-
- //write flags
- dos.writeInt(fn.flags.size());
- for (JobFlag flag : fn.flags) {
- dos.writeInt(flag.ordinal());
}
}
}
@@ -1320,7 +1354,8 @@
int cdid = dis.readInt();
int senderIndex = dis.readInt();
int receiverIndex = dis.readInt();
- PartitionId pid = new PartitionId(new JobId(jobId), new
ConnectorDescriptorId(cdid), senderIndex, receiverIndex);
+ PartitionId pid = new PartitionId(new JobId(jobId), new
ConnectorDescriptorId(cdid), senderIndex,
+ receiverIndex);
return pid;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index c3376e6..600b5c1 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -44,11 +44,10 @@
}
@Override
- public void startTasks(DeploymentId deploymentId, JobId jobId, byte[]
planBytes,
- List<TaskAttemptDescriptor> taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
- EnumSet<JobFlag> flags) throws Exception {
- CCNCFunctions.StartTasksFunction stf = new
CCNCFunctions.StartTasksFunction(deploymentId, jobId, planBytes,
- taskDescriptors, connectorPolicies, flags);
+ public void startTasks(JobId jobId, List<TaskAttemptDescriptor>
taskDescriptors,
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
EnumSet<JobFlag> flags) throws Exception {
+ CCNCFunctions.StartTasksFunction stf = new
CCNCFunctions.StartTasksFunction(jobId, taskDescriptors,
+ connectorPolicies, flags);
ipcHandle.send(-1, stf, null);
}
@@ -107,4 +106,10 @@
CCNCFunctions.ThreadDumpRequestFunction fn = new
CCNCFunctions.ThreadDumpRequestFunction(requestId);
ipcHandle.send(-1, fn, null);
}
+
+ @Override
+ public void startJob(DeploymentId deploymentId, JobId jobId, byte[]
planBytes) throws Exception {
+ CCNCFunctions.StartJobFunction stf = new
CCNCFunctions.StartJobFunction(deploymentId, jobId, planBytes);
+ ipcHandle.send(-1, stf, null);
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 0d9ff5d..21e252a 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
@@ -97,6 +98,9 @@
private final AtomicLong memoryAllocation;
+ private final AtomicBoolean aborted;
+ private volatile Thread taskInitializationThreads;
+
private JobStatus cleanupStatus;
private boolean cleanupPending;
@@ -128,6 +132,7 @@
}
IGlobalJobDataFactory gjdf = acg.getGlobalJobDataFactory();
globalJobData = gjdf != null ? gjdf.createGlobalJobData(this) : null;
+ aborted = new AtomicBoolean(false);
}
@Override
@@ -165,6 +170,7 @@
this.nodeId = nodeId;
}
+ @Override
public String toString() {
return super.toString() + "@" + nodeId;
}
@@ -334,4 +340,20 @@
public ClassLoader getClassLoader() throws HyracksException {
return DeploymentUtils.getClassLoader(deploymentId, appCtx);
}
+
+ public boolean isAborted() {
+ return aborted.get();
+ }
+
+ public synchronized void abort() {
+ if (taskInitializationThreads != null) {
+ taskInitializationThreads.interrupt();
+ taskInitializationThreads = null;
+ }
+ aborted.set(true);
+ }
+
+ public void setTaskInitializationThreads(Thread taskInitializationThreads)
{
+ this.taskInitializationThreads = taskInitializationThreads;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index d7facf0..ecde636 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -84,11 +84,12 @@
import org.apache.hyracks.control.nc.work.BuildJobProfilesWork;
import org.apache.hyracks.control.nc.work.CleanupJobletWork;
import org.apache.hyracks.control.nc.work.DeployBinaryWork;
+import org.apache.hyracks.control.nc.work.NodeThreadDumpWork;
import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
import org.apache.hyracks.control.nc.work.ShutdownWork;
+import org.apache.hyracks.control.nc.work.StartJobWork;
import org.apache.hyracks.control.nc.work.StartTasksWork;
import org.apache.hyracks.control.nc.work.StateDumpWork;
-import org.apache.hyracks.control.nc.work.NodeThreadDumpWork;
import org.apache.hyracks.control.nc.work.UnDeployBinaryWork;
import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.api.IIPCI;
@@ -184,8 +185,8 @@
queue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves
MAX_PRIORITY of the heartbeat thread.
jobletMap = new Hashtable<JobId, Joblet>();
timer = new Timer(true);
- serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
- new File(new File(NodeControllerService.class.getName()), id));
+ serverCtx = new
ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
+ NodeControllerService.class.getName()), id));
memoryMXBean = ManagementFactory.getMemoryMXBean();
gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
threadMXBean = ManagementFactory.getThreadMXBean();
@@ -257,8 +258,8 @@
ncConfig.resultPublicPort,
FullFrameChannelInterfaceFactory.INSTANCE);
if (ncConfig.messagingIPAddress != null &&
appCtx.getMessagingChannelInterfaceFactory() != null) {
messagingNetManager = new MessagingNetworkManager(this,
ncConfig.messagingIPAddress, ncConfig.messagingPort,
- ncConfig.nNetThreads, ncConfig.messagingPublicIPAddress,
ncConfig.messagingPublicPort,
- appCtx.getMessagingChannelInterfaceFactory());
+ ncConfig.nNetThreads, ncConfig.messagingPublicIPAddress,
ncConfig.messagingPublicPort, appCtx
+ .getMessagingChannelInterfaceFactory());
}
}
@@ -288,10 +289,10 @@
NetworkAddress netAddress = netManager.getPublicNetworkAddress();
NetworkAddress meesagingPort = messagingNetManager != null ?
messagingNetManager.getPublicNetworkAddress()
: null;
- ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id,
ncConfig, netAddress, datasetAddress,
- osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(),
osMXBean.getAvailableProcessors(),
- runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(),
runtimeMXBean.getVmVendor(),
- runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(),
runtimeMXBean.getBootClassPath(),
+ ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id,
ncConfig, netAddress, datasetAddress, osMXBean
+ .getName(), osMXBean.getArch(), osMXBean.getVersion(),
osMXBean.getAvailableProcessors(), runtimeMXBean
+ .getVmName(), runtimeMXBean.getVmVersion(),
runtimeMXBean.getVmVendor(), runtimeMXBean
+ .getClassPath(),
runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
runtimeMXBean.getInputArguments(),
runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort,
PidHelper.getPid()));
@@ -337,8 +338,8 @@
if (className != null) {
Class<?> c = Class.forName(className);
ncAppEntryPoint = (INCApplicationEntryPoint) c.newInstance();
- String[] args = ncConfig.appArgs == null ? new String[0]
- : ncConfig.appArgs.toArray(new
String[ncConfig.appArgs.size()]);
+ String[] args = ncConfig.appArgs == null ? new String[0] :
ncConfig.appArgs.toArray(
+ new String[ncConfig.appArgs.size()]);
ncAppEntryPoint.start(appCtx, args);
}
executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
@@ -517,14 +518,20 @@
case SEND_APPLICATION_MESSAGE:
CCNCFunctions.SendApplicationMessageFunction amf =
(CCNCFunctions.SendApplicationMessageFunction) fn;
- queue.schedule(new
ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
- amf.getDeploymentId(), amf.getNodeId()));
+ queue.schedule(new
ApplicationMessageWork(NodeControllerService.this, amf.getMessage(), amf
+ .getDeploymentId(), amf.getNodeId()));
+ return;
+
+ case START_JOB:
+ CCNCFunctions.StartJobFunction sjf =
(CCNCFunctions.StartJobFunction) fn;
+ queue.schedule(new
StartJobWork(NodeControllerService.this, sjf.getDeploymentId(), sjf.getJobId(),
+ sjf.getPlanBytes()));
return;
case START_TASKS:
CCNCFunctions.StartTasksFunction stf =
(CCNCFunctions.StartTasksFunction) fn;
- queue.schedule(new
StartTasksWork(NodeControllerService.this, stf.getDeploymentId(),
stf.getJobId(),
- stf.getPlanBytes(), stf.getTaskDescriptors(),
stf.getConnectorPolicies(), stf.getFlags()));
+ queue.schedule(new
StartTasksWork(NodeControllerService.this, stf.getJobId(), stf
+ .getTaskDescriptors(), stf.getConnectorPolicies(),
stf.getFlags()));
return;
case ABORT_TASKS:
@@ -540,8 +547,8 @@
case REPORT_PARTITION_AVAILABILITY:
CCNCFunctions.ReportPartitionAvailabilityFunction rpaf =
(CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
- queue.schedule(new
ReportPartitionAvailabilityWork(NodeControllerService.this,
- rpaf.getPartitionId(), rpaf.getNetworkAddress()));
+ queue.schedule(new
ReportPartitionAvailabilityWork(NodeControllerService.this, rpaf
+ .getPartitionId(), rpaf.getNetworkAddress()));
return;
case NODE_REGISTRATION_RESULT:
@@ -557,8 +564,8 @@
case DEPLOY_BINARY:
CCNCFunctions.DeployBinaryFunction dbf =
(CCNCFunctions.DeployBinaryFunction) fn;
- queue.schedule(new
DeployBinaryWork(NodeControllerService.this, dbf.getDeploymentId(),
- dbf.getBinaryURLs()));
+ queue.schedule(new
DeployBinaryWork(NodeControllerService.this, dbf.getDeploymentId(), dbf
+ .getBinaryURLs()));
return;
case UNDEPLOY_BINARY:
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
index 6566655..a8ca58e 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
@@ -40,10 +40,17 @@
private final List<TaskAttemptId> tasks;
+ private final Joblet ji;
+
public AbortTasksWork(NodeControllerService ncs, JobId jobId,
List<TaskAttemptId> tasks) {
this.ncs = ncs;
this.jobId = jobId;
this.tasks = tasks;
+ Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+ ji = jobletMap.get(jobId);
+ if (ji != null) {
+ ji.abort();
+ }
}
@Override
@@ -55,9 +62,6 @@
if (dpm != null) {
ncs.getDatasetPartitionManager().abortReader(jobId);
}
-
- Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
- Joblet ji = jobletMap.get(jobId);
if (ji != null) {
Map<TaskAttemptId, Task> taskMap = ji.getTaskMap();
for (TaskAttemptId taId : tasks) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartJobWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartJobWork.java
new file mode 100644
index 0000000..de72d1c
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartJobWork.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.work;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.job.ActivityClusterGraph;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.nc.Joblet;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.application.NCApplicationContext;
+
+public class StartJobWork extends AbstractWork {
+ private static final Logger LOGGER =
Logger.getLogger(StartTasksWork.class.getName());
+ private final NodeControllerService ncs;
+ private final DeploymentId deploymentId;
+ private final JobId jobId;
+ private final byte[] acgBytes;
+
+ public StartJobWork(NodeControllerService ncs, DeploymentId deploymentId,
JobId jobId, byte[] acgBytes) {
+ this.ncs = ncs;
+ this.deploymentId = deploymentId;
+ this.jobId = jobId;
+ this.acgBytes = acgBytes;
+ }
+
+ @Override
+ public void run() {
+ try {
+ NCApplicationContext appCtx = ncs.getApplicationContext();
+ Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+ Joblet ji = jobletMap.get(jobId);
+ if (ji != null) {
+ throw new RuntimeException("Job " + jobId + " already exists");
+ }
+ if (acgBytes == null) {
+ throw new NullPointerException("JobActivityGraph was null");
+ }
+ ActivityClusterGraph acg = (ActivityClusterGraph)
DeploymentUtils.deserialize(acgBytes, deploymentId,
+ appCtx);
+ ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg);
+ jobletMap.put(jobId, ji);
+ } catch (Exception e) {
+ // notify cc of start job failure
+ LOGGER.log(Level.SEVERE, "Failure during job " + jobId + "
creation on NC " + ncs.getId(), e);
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index b585199..a760b32 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -28,7 +28,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.IPartitionCollector;
import org.apache.hyracks.api.comm.IPartitionWriterFactory;
@@ -45,7 +44,6 @@
import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.ActivityCluster;
import org.apache.hyracks.api.job.ActivityClusterGraph;
@@ -53,13 +51,12 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.comm.channels.NetworkInputChannel;
-import org.apache.hyracks.control.common.deployment.DeploymentUtils;
import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
+import org.apache.hyracks.control.common.utils.ExceptionUtils;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.Joblet;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.control.nc.Task;
-import org.apache.hyracks.control.nc.application.NCApplicationContext;
import org.apache.hyracks.control.nc.partitions.MaterializedPartitionWriter;
import
org.apache.hyracks.control.nc.partitions.MaterializingPipelinedPartition;
import org.apache.hyracks.control.nc.partitions.PipelinedPartition;
@@ -71,11 +68,7 @@
private final NodeControllerService ncs;
- private final DeploymentId deploymentId;
-
private final JobId jobId;
-
- private final byte[] acgBytes;
private final List<TaskAttemptDescriptor> taskDescriptors;
@@ -83,13 +76,10 @@
private final EnumSet<JobFlag> flags;
- public StartTasksWork(NodeControllerService ncs, DeploymentId
deploymentId, JobId jobId, byte[] acgBytes,
- List<TaskAttemptDescriptor> taskDescriptors,
+ public StartTasksWork(NodeControllerService ncs, JobId jobId,
List<TaskAttemptDescriptor> taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap,
EnumSet<JobFlag> flags) {
this.ncs = ncs;
- this.deploymentId = deploymentId;
this.jobId = jobId;
- this.acgBytes = acgBytes;
this.taskDescriptors = taskDescriptors;
this.connectorPoliciesMap = connectorPoliciesMap;
this.flags = flags;
@@ -97,9 +87,17 @@
@Override
public void run() {
+ Joblet joblet = ncs.getJobletMap().get(jobId);
+ if (joblet == null) {
+ LOGGER.log(Level.SEVERE, "Joblet not found for job " + jobId + "
on Node " + ncs.getId());
+ return;
+ }
+ if (joblet.isAborted()) {
+ return;
+ }
+ Task task = null;
try {
- NCApplicationContext appCtx = ncs.getApplicationContext();
- final Joblet joblet = getOrCreateLocalJoblet(deploymentId, jobId,
appCtx, acgBytes);
+ joblet.setTaskInitializationThreads(Thread.currentThread());
final ActivityClusterGraph acg = joblet.getActivityClusterGraph();
IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
@@ -129,8 +127,9 @@
}
final int partition = tid.getPartition();
List<IConnectorDescriptor> inputs =
ac.getActivityInputMap().get(aid);
- Task task = new Task(joblet, taId, han.getClass().getName(),
ncs.getExecutorService(), ncs,
+ task = new Task(joblet, taId, han.getClass().getName(),
ncs.getExecutorService(), ncs,
createInputChannels(td, inputs));
+ // following call could throw an exception
IOperatorNodePushable operator = han.createPushRuntime(task,
rdp, partition, td.getPartitionCount());
List<IPartitionCollector> collectors = new
ArrayList<IPartitionCollector>();
@@ -161,47 +160,40 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("output: " + i + ": " +
conn.getConnectorId());
}
- IFrameWriter writer = conn.createPartitioner(task,
recordDesc, pwFactory, partition,
- td.getPartitionCount(),
td.getOutputPartitionCounts()[i]);
+ IFrameWriter writer = conn.createPartitioner(task,
recordDesc, pwFactory, partition, td
+ .getPartitionCount(),
td.getOutputPartitionCounts()[i]);
operator.setOutputFrameWriter(i, writer, recordDesc);
}
}
task.setTaskRuntime(collectors.toArray(new
IPartitionCollector[collectors.size()]), operator);
+ if (joblet.isAborted()) {
+ return;
+ }
joblet.addTask(task);
-
task.start();
}
} catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
-
- private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, JobId
jobId, INCApplicationContext appCtx,
- byte[] acgBytes) throws Exception {
- Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
- Joblet ji = jobletMap.get(jobId);
- if (ji == null) {
- if (acgBytes == null) {
- throw new NullPointerException("JobActivityGraph was null");
+ LOGGER.log(Level.SEVERE, "Failure starting task", e);
+ // notify cc of start task failure
+ List<Exception> exceptions = new ArrayList<>();
+ ExceptionUtils.setNodeIds(exceptions, ncs.getId());
+ ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, task,
exceptions));
+ } finally {
+ if (joblet != null) {
+ joblet.setTaskInitializationThreads(null);
}
- ActivityClusterGraph acg = (ActivityClusterGraph)
DeploymentUtils.deserialize(acgBytes, deploymentId,
- appCtx);
- ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg);
- jobletMap.put(jobId, ji);
}
- return ji;
}
private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor
td, final int partition, Task task,
int i, IConnectorDescriptor conn, RecordDescriptor recordDesc,
IConnectorPolicy cPolicy)
throws HyracksDataException {
- IPartitionCollector collector = conn.createPartitionCollector(task,
recordDesc, partition,
- td.getInputPartitionCounts()[i], td.getPartitionCount());
+ IPartitionCollector collector = conn.createPartitionCollector(task,
recordDesc, partition, td
+ .getInputPartitionCounts()[i], td.getPartitionCount());
if (cPolicy.materializeOnReceiveSide()) {
- return new ReceiveSideMaterializingCollector(task,
ncs.getPartitionManager(), collector,
- task.getTaskAttemptId(), ncs.getExecutorService());
+ return new ReceiveSideMaterializingCollector(task,
ncs.getPartitionManager(), collector, task
+ .getTaskAttemptId(), ncs.getExecutorService());
} else {
return collector;
}
@@ -225,7 +217,8 @@
@Override
public IFrameWriter createFrameWriter(int receiverIndex)
throws HyracksDataException {
return new MaterializingPipelinedPartition(ctx,
ncs.getPartitionManager(), new PartitionId(
- jobId, conn.getConnectorId(), senderIndex,
receiverIndex), taId, ncs.getExecutorService());
+ jobId, conn.getConnectorId(), senderIndex,
receiverIndex), taId, ncs
+ .getExecutorService());
}
};
}
@@ -233,8 +226,8 @@
factory = new IPartitionWriterFactory() {
@Override
public IFrameWriter createFrameWriter(int receiverIndex)
throws HyracksDataException {
- return new PipelinedPartition(ctx,
ncs.getPartitionManager(), new PartitionId(jobId,
- conn.getConnectorId(), senderIndex,
receiverIndex), taId);
+ return new PipelinedPartition(ctx,
ncs.getPartitionManager(), new PartitionId(jobId, conn
+ .getConnectorId(), senderIndex, receiverIndex),
taId);
}
};
}
@@ -254,8 +247,8 @@
* @return a list of known channels, one for each connector
* @throws UnknownHostException
*/
- private List<List<PartitionChannel>>
createInputChannels(TaskAttemptDescriptor td, List<IConnectorDescriptor> inputs)
- throws UnknownHostException {
+ private List<List<PartitionChannel>>
createInputChannels(TaskAttemptDescriptor td,
+ List<IConnectorDescriptor> inputs) throws UnknownHostException {
NetworkAddress[][] inputAddresses = td.getInputPartitionLocations();
List<List<PartitionChannel>> channelsForInputConnectors = new
ArrayList<List<PartitionChannel>>();
if (inputAddresses != null) {
@@ -266,8 +259,8 @@
NetworkAddress networkAddress = inputAddresses[i][j];
PartitionId pid = new PartitionId(jobId,
inputs.get(i).getConnectorId(), j, td
.getTaskAttemptId().getTaskId().getPartition());
- PartitionChannel channel = new PartitionChannel(pid,
new NetworkInputChannel(
- ncs.getNetworkManager(), new
InetSocketAddress(InetAddress.getByAddress(networkAddress
+ PartitionChannel channel = new PartitionChannel(pid,
new NetworkInputChannel(ncs
+ .getNetworkManager(), new
InetSocketAddress(InetAddress.getByAddress(networkAddress
.lookupIpAddress()),
networkAddress.getPort()), pid, 5));
channels.add(channel);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 9685837..72569f3 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -27,12 +27,6 @@
import java.util.logging.Logger;
import org.apache.commons.io.FileUtils;
-import org.json.JSONArray;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
@@ -51,6 +45,11 @@
import org.apache.hyracks.control.nc.resources.memory.FrameManager;
import org.apache.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import org.json.JSONArray;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
public abstract class AbstractMultiNCIntegrationTest {
@@ -71,8 +70,7 @@
public TemporaryFolder outputFolder = new TemporaryFolder();
public AbstractMultiNCIntegrationTest() {
- outputFiles = new ArrayList<File>();
- ;
+ outputFiles = new ArrayList<File>();;
}
@BeforeClass
@@ -135,37 +133,38 @@
IFrameTupleAccessor frameTupleAccessor = new
ResultFrameTupleAccessor();
- IHyracksDataset hyracksDataset = new HyracksDataset(hcc,
spec.getFrameSize(), nReaders);
- IHyracksDatasetReader reader = hyracksDataset.createReader(jobId,
spec.getResultSetIds().get(0));
+ if (!spec.getResultSetIds().isEmpty()) {
+ IHyracksDataset hyracksDataset = new HyracksDataset(hcc,
spec.getFrameSize(), nReaders);
+ IHyracksDatasetReader reader = hyracksDataset.createReader(jobId,
spec.getResultSetIds().get(0));
- JSONArray resultRecords = new JSONArray();
- ByteBufferInputStream bbis = new ByteBufferInputStream();
+ JSONArray resultRecords = new JSONArray();
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
- int readSize = reader.read(resultFrame);
+ int readSize = reader.read(resultFrame);
- while (readSize > 0) {
+ while (readSize > 0) {
- try {
- frameTupleAccessor.reset(resultFrame.getBuffer());
- for (int tIndex = 0; tIndex <
frameTupleAccessor.getTupleCount(); tIndex++) {
- int start = frameTupleAccessor.getTupleStartOffset(tIndex);
- int length = frameTupleAccessor.getTupleEndOffset(tIndex)
- start;
- bbis.setByteBuffer(resultFrame.getBuffer(), start);
- byte[] recordBytes = new byte[length];
- bbis.read(recordBytes, 0, length);
- resultRecords.put(new String(recordBytes, 0, length));
- }
- } finally {
try {
- bbis.close();
- } catch (IOException e) {
- throw new HyracksDataException(e);
+ frameTupleAccessor.reset(resultFrame.getBuffer());
+ for (int tIndex = 0; tIndex <
frameTupleAccessor.getTupleCount(); tIndex++) {
+ int start =
frameTupleAccessor.getTupleStartOffset(tIndex);
+ int length =
frameTupleAccessor.getTupleEndOffset(tIndex) - start;
+ bbis.setByteBuffer(resultFrame.getBuffer(), start);
+ byte[] recordBytes = new byte[length];
+ bbis.read(recordBytes, 0, length);
+ resultRecords.put(new String(recordBytes, 0, length));
+ }
+ } finally {
+ try {
+ bbis.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
}
+
+ readSize = reader.read(resultFrame);
}
-
- readSize = reader.read(resultFrame);
}
-
hcc.waitForCompletion(jobId);
dumpOutputFiles();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
new file mode 100644
index 0000000..5646bbe
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.tests.integration;
+
+import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.SinkOperatorDescriptor;
+import
org.apache.hyracks.tests.util.ExceptionOnCreatePushRuntimeOperatorDescriptor;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JobFailureTest extends AbstractMultiNCIntegrationTest {
+
+ @Test
+ public void failureOnCreatePushRuntime() throws Exception {
+ JobSpecification spec = new JobSpecification();
+ AbstractSingleActivityOperatorDescriptor sourceOpDesc = new
ExceptionOnCreatePushRuntimeOperatorDescriptor(spec,
+ 0, 1, new int[] { 4 }, Integer.MAX_VALUE);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
sourceOpDesc, ASTERIX_IDS);
+ SinkOperatorDescriptor sinkOpDesc = new SinkOperatorDescriptor(spec,
1);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
sinkOpDesc, ASTERIX_IDS);
+ IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn, sourceOpDesc, 0, sinkOpDesc, 0);
+ spec.addRoot(sinkOpDesc);
+ try {
+ runTest(spec);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+
Assert.assertTrue(ExceptionOnCreatePushRuntimeOperatorDescriptor.succeed());
+ // should also check the content of the different ncs
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
new file mode 100644
index 0000000..9f0a6d9
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.tests.util;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+public class ExceptionOnCreatePushRuntimeOperatorDescriptor extends
AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ private static AtomicInteger createPushRuntime = new AtomicInteger();
+ private static AtomicInteger initializeCounter = new AtomicInteger();
+ private static AtomicInteger openCloseCounter = new AtomicInteger();
+ private final int[] exceptionPartitions;
+ private final int duration;
+
+ public
ExceptionOnCreatePushRuntimeOperatorDescriptor(IOperatorDescriptorRegistry
spec, int inputArity,
+ int outputArity, int[] exceptionPartitions, int duration) {
+ super(spec, inputArity, outputArity);
+ this.exceptionPartitions = exceptionPartitions;
+ this.duration = duration;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int
nPartitions) throws HyracksDataException {
+ createPushRuntime.incrementAndGet();
+ try {
+ if (exceptionPartitions != null) {
+ for (int p : exceptionPartitions) {
+ if (p == partition) {
+ throw new HyracksDataException("I throw exceptions");
+ }
+ }
+ }
+ return new IOperatorNodePushable() {
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter
writer, RecordDescriptor recordDesc)
+ throws HyracksDataException {
+ }
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ initializeCounter.incrementAndGet();
+ if (duration > 0) {
+ try {
+ Thread.sleep(duration);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public IFrameWriter getInputFrameWriter(int index) {
+ return new IFrameWriter() {
+ @Override
+ public void open() throws HyracksDataException {
+ openCloseCounter.incrementAndGet();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws
HyracksDataException {
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ openCloseCounter.decrementAndGet();
+ }
+ };
+ }
+
+ @Override
+ public int getInputArity() {
+ return inputArity;
+ }
+
+ @Override
+ public String getDisplayName() {
+ return
ExceptionOnCreatePushRuntimeOperatorDescriptor.class.getSimpleName()
+ + ".OperatorNodePushable:" + partition;
+ }
+
+ @Override
+ public void deinitialize() throws HyracksDataException {
+ initializeCounter.decrementAndGet();
+ }
+ };
+ } finally {
+ createPushRuntime.decrementAndGet();
+ }
+ }
+
+ public static boolean succeed() {
+ boolean success = openCloseCounter.get() == 0 &&
createPushRuntime.get() == 0 && initializeCounter.get() == 0;
+ if (!success) {
+ System.err.println("Failure:");
+ System.err.println("CreatePushRuntime:" + createPushRuntime.get());
+ System.err.println("InitializeCounter:" + initializeCounter.get());
+ System.err.println("OpenCloseCounter:" + openCloseCounter.get());
+ }
+ return success;
+ }
+}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1197
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I2ec2c798b704ca426d5937f22e6d2bd394a9095a
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>