This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch helix-integration in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 573dbab1a29f1bf2f1fdb8c9cacdb7ad42b105ad Author: dimuthu <dimuthu.upeks...@gmail.com> AuthorDate: Fri Mar 2 13:16:49 2018 -0500 Fixing bugs in pre workflow --- .../airavata/helix/agent/ssh/SshAgentAdaptor.java | 4 +- .../apache/airavata/helix/core/AbstractTask.java | 10 ++++ .../airavata/helix/workflow/WorkflowManager.java | 2 +- .../airavata/helix/impl/task/EnvSetupTask.java | 2 +- .../airavata/helix/impl/task/TaskContext.java | 68 +++++++++++++++++++++- .../impl/task/submission/GroovyMapBuilder.java | 4 +- .../submission/task/DefaultJobSubmissionTask.java | 6 +- .../task/submission/task/JobSubmissionTask.java | 4 +- .../helix/impl/workflow/SimpleWorkflow.java | 5 +- .../src/main/resources/application.properties | 2 +- 10 files changed, 92 insertions(+), 15 deletions(-) diff --git a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java index 2ad2415..5392ab5 100644 --- a/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java +++ b/modules/airavata-helix/agent-impl/ssh-agent/src/main/java/org/apache/airavata/helix/agent/ssh/SshAgentAdaptor.java @@ -132,13 +132,12 @@ public class SshAgentAdaptor implements AgentAdaptor { ChannelExec channelExec = null; try { channelExec = ((ChannelExec) session.openChannel("exec")); - channelExec.setCommand(command); + channelExec.setCommand("cd " + workingDirectory + "; " + command); channelExec.setInputStream(null); InputStream out = channelExec.getInputStream(); InputStream err = channelExec.getErrStream(); channelExec.connect(); - commandOutput.setExitCode(channelExec.getExitStatus()); commandOutput.readStdOutFromStream(out); commandOutput.readStdErrFromStream(err); return commandOutput; @@ -150,6 +149,7 @@ public class SshAgentAdaptor implements AgentAdaptor { throw new AgentException(e); } finally { if (channelExec != null) { + commandOutput.setExitCode(channelExec.getExitStatus()); channelExec.disconnect(); } } diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java index 04fa37f..5aca9cd 100644 --- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java +++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/AbstractTask.java @@ -26,6 +26,8 @@ public abstract class AbstractTask extends UserContentStore implements Task { private TaskCallbackContext callbackContext; private TaskHelper taskHelper; + private int retryCount = 3; + @Override public void init(HelixManager manager, String workflowName, String jobName, String taskName) { super.init(manager, workflowName, jobName, taskName); @@ -105,4 +107,12 @@ public abstract class AbstractTask extends UserContentStore implements Task { this.taskHelper = taskHelper; return this; } + + public int getRetryCount() { + return retryCount; + } + + public void setRetryCount(int retryCount) { + this.retryCount = retryCount; + } } diff --git a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java index ab7e3c4..9ecafb9 100644 --- a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java +++ b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java @@ -61,7 +61,7 @@ public class WorkflowManager { JobConfig.Builder job = new JobConfig.Builder() .addTaskConfigs(taskBuilds) .setFailureThreshold(0) - .setMaxAttemptsPerTask(3); + .setMaxAttemptsPerTask(data.getRetryCount()); if (!globalParticipant) { job.setInstanceGroupTag(taskType); diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java index eafa53d..ddba5f2 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/EnvSetupTask.java @@ -28,7 +28,7 @@ public class EnvSetupTask extends AiravataTask { logger.info("Creating directory " + getTaskContext().getWorkingDir() + " on compute resource " + getTaskContext().getComputeResourceId()); adaptor.createDirectory(getTaskContext().getWorkingDir()); publishTaskState(TaskState.COMPLETED); - return onSuccess("Successfully completed"); + return onSuccess("Envi setup task successfully completed " + getTaskId()); } catch (Exception e) { try { publishTaskState(TaskState.FAILED); diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java index 64a7de8..489a196 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java @@ -13,6 +13,8 @@ import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescr import org.apache.airavata.model.appcatalog.userresourceprofile.UserComputeResourcePreference; import org.apache.airavata.model.appcatalog.userresourceprofile.UserResourceProfile; import org.apache.airavata.model.appcatalog.userresourceprofile.UserStoragePreference; +import org.apache.airavata.model.application.io.DataType; +import org.apache.airavata.model.application.io.OutputDataObjectType; import org.apache.airavata.model.data.movement.DataMovementProtocol; import org.apache.airavata.model.job.JobModel; import org.apache.airavata.model.process.ProcessModel; @@ -23,11 +25,13 @@ import org.apache.airavata.model.task.TaskModel; import org.apache.airavata.registry.cpi.AppCatalog; import org.apache.airavata.registry.cpi.AppCatalogException; import org.apache.airavata.registry.cpi.ExperimentCatalog; +import org.apache.airavata.registry.cpi.ExperimentCatalogModelType; import org.apache.curator.framework.CuratorFramework; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.util.*; public class TaskContext { @@ -436,8 +440,38 @@ public class TaskContext { this.resourceJobManager = resourceJobManager; } - public ResourceJobManager getResourceJobManager() { - return resourceJobManager; + public ResourceJobManager getResourceJobManager() throws Exception { + + if (this.resourceJobManager == null) { + JobSubmissionInterface jsInterface = getPreferredJobSubmissionInterface(); + + if (jsInterface == null) { + throw new Exception("Job Submission interface cannot be empty at this point"); + } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) { + SSHJobSubmission sshJobSubmission = getAppCatalog().getComputeResource().getSSHJobSubmission + (jsInterface.getJobSubmissionInterfaceId()); + // context method. + resourceJobManager = sshJobSubmission.getResourceJobManager(); + } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.LOCAL) { + LOCALSubmission localSubmission = getAppCatalog().getComputeResource().getLocalJobSubmission + (jsInterface.getJobSubmissionInterfaceId()); + resourceJobManager = localSubmission.getResourceJobManager(); + } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH_FORK) { + SSHJobSubmission sshJobSubmission = getAppCatalog().getComputeResource().getSSHJobSubmission + (jsInterface.getJobSubmissionInterfaceId()); + resourceJobManager = sshJobSubmission.getResourceJobManager(); + } else if (jsInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.CLOUD) { + return null; + } else { + throw new Exception("Unsupported JobSubmissionProtocol - " + jsInterface.getJobSubmissionProtocol() + .name()); + } + + if (resourceJobManager == null) { + throw new Exception("Resource Job Manager is empty."); + } + } + return this.resourceJobManager; } public String getLocalWorkingDir() { @@ -794,6 +828,36 @@ public class TaskContext { .getApplicationInterface(processModel.getApplicationInterfaceId())); ctx.setComputeResourceDescription(appCatalog.getComputeResource().getComputeResource (ctx.getComputeResourceId())); + + List<OutputDataObjectType> applicationOutputs = ctx.getApplicationInterfaceDescription().getApplicationOutputs(); + if (applicationOutputs != null && !applicationOutputs.isEmpty()) { + for (OutputDataObjectType outputDataObjectType : applicationOutputs) { + if (outputDataObjectType.getType().equals(DataType.STDOUT)) { + if (outputDataObjectType.getValue() == null || outputDataObjectType.getValue().equals("")) { + String stdOut = (ctx.getWorkingDir().endsWith(File.separator) ? ctx.getWorkingDir() : ctx.getWorkingDir() + File.separator) + + ctx.getApplicationInterfaceDescription().getApplicationName() + ".stdout"; + outputDataObjectType.setValue(stdOut); + ctx.setStdoutLocation(stdOut); + } else { + ctx.setStdoutLocation(outputDataObjectType.getValue()); + } + } + if (outputDataObjectType.getType().equals(DataType.STDERR)) { + if (outputDataObjectType.getValue() == null || outputDataObjectType.getValue().equals("")) { + String stderrLocation = (ctx.getWorkingDir().endsWith(File.separator) ? ctx.getWorkingDir() : ctx.getWorkingDir() + File.separator) + + ctx.getApplicationInterfaceDescription().getApplicationName() + ".stderr"; + outputDataObjectType.setValue(stderrLocation); + ctx.setStderrLocation(stderrLocation); + } else { + ctx.setStderrLocation(outputDataObjectType.getValue()); + } + } + } + } + + // TODO move this to some where else as this is not the correct place to do so + experimentCatalog.update(ExperimentCatalogModelType.PROCESS, processModel, processId); + processModel.setProcessOutputs(applicationOutputs); return ctx; } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java index e4267ce..2119755 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/GroovyMapBuilder.java @@ -64,7 +64,7 @@ public class GroovyMapBuilder { mapData.setInputs(inputValues); List<String> inputValuesAll = getProcessInputValues(taskContext.getProcessModel().getProcessInputs(), false); - inputValues.addAll(getProcessOutputValues(taskContext.getProcessModel().getProcessOutputs(), false)); + inputValuesAll.addAll(getProcessOutputValues(taskContext.getProcessModel().getProcessOutputs(), false)); mapData.setInputsAll(inputValuesAll); mapData.setUserName(taskContext.getComputeResourceLoginUserName()); @@ -103,7 +103,7 @@ public class GroovyMapBuilder { mapData.setQueueName(scheduling.getQueueName()); } if (totalNodeCount > 0) { - mapData.setNodes(totalCPUCount); + mapData.setNodes(totalNodeCount); } if (totalCPUCount > 0) { int ppn = totalCPUCount / totalNodeCount; diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java index c85e18b..e21f200 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java @@ -46,6 +46,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask { jobModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime()); jobModel.setTaskId(getTaskId()); jobModel.setJobName(mapData.getJobName()); + jobModel.setJobDescription("Sample description"); if (mapData != null) { //jobModel.setJobDescription(FileUtils.readFileToString(jobFile)); @@ -71,10 +72,11 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask { statusList.add(new JobStatus(JobState.FAILED)); statusList.get(0).setReason(submissionOutput.getFailureReason()); jobModel.setJobStatuses(statusList); - jobModel.setJobDescription("Sample description"); saveJobModel(jobModel); logger.error("expId: " + getExperimentId() + ", processid: " + getProcessId()+ ", taskId: " + - getTaskId() + " :- Job submission failed for job name " + jobModel.getJobName()); + getTaskId() + " :- Job submission failed for job name " + jobModel.getJobName() + + ". Exit code : " + submissionOutput.getExitCode() + ", Submission failed : " + + submissionOutput.isJobSubmissionFailed()); ErrorModel errorModel = new ErrorModel(); errorModel.setUserFriendlyMessage(submissionOutput.getFailureReason()); diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java index b517af1..ac314e9 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/JobSubmissionTask.java @@ -181,9 +181,9 @@ public abstract class JobSubmissionTask extends AiravataTask { MessageContext msgCtx = new MessageContext(jobStatusChangeEvent, MessageType.JOB, AiravataUtils.getId (MessageType.JOB.name()), getGatewayId()); msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); - getStatusPublisher().publish(msgCtx); + //getStatusPublisher().publish(msgCtx); } catch (Exception e) { - throw new Exception("Error persisting job status" + e.getLocalizedMessage(), e); + throw new Exception("Error persisting job status " + e.getLocalizedMessage(), e); } } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java index 63921db..abd36e1 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/SimpleWorkflow.java @@ -27,7 +27,7 @@ public class SimpleWorkflow { public static void main(String[] args) throws Exception { - String processId = "PROCESS_438a87cc-2dec-4edc-bfeb-31128df91bb6"; + String processId = "PROCESS_5b252ad9-d630-4cf9-80e3-0c30c55d1001"; AppCatalog appCatalog = RegistryFactory.getAppCatalog(); ExperimentCatalog experimentCatalog = RegistryFactory.getDefaultExpCatalog(); @@ -51,10 +51,11 @@ public class SimpleWorkflow { airavataTask = new EnvSetupTask(); } else if (taskModel.getTaskType() == TaskTypes.JOB_SUBMISSION) { airavataTask = new DefaultJobSubmissionTask(); + airavataTask.setRetryCount(1); jobSubmissionFound = true; } else if (taskModel.getTaskType() == TaskTypes.DATA_STAGING) { if (jobSubmissionFound) { - airavataTask = new OutputDataStagingTask(); + //airavataTask = new OutputDataStagingTask(); } else { airavataTask = new InputDataStagingTask(); } diff --git a/modules/helix-spectator/src/main/resources/application.properties b/modules/helix-spectator/src/main/resources/application.properties index a9b0969..b4b8048 100644 --- a/modules/helix-spectator/src/main/resources/application.properties +++ b/modules/helix-spectator/src/main/resources/application.properties @@ -1,3 +1,3 @@ zookeeper.connection.url=localhost:2199 helix.cluster.name=AiravataDemoCluster -participant.name=all-p2 \ No newline at end of file +participant.name=all-p3 \ No newline at end of file -- To stop receiving notification emails like this one, please contact dimuthu...@apache.org.