This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch helix-integration in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 71b294ed957cdddf6a08bf32f690c8204fedf3af Author: dimuthu <dimuthu.upeks...@gmail.com> AuthorDate: Wed Mar 7 16:09:50 2018 -0500 Refactoring --- .../airavata/agents/api/JobSubmissionOutput.java | 9 + .../helix/impl/controller/HelixController.java | 54 +---- .../helix/impl/participant/GlobalParticipant.java | 68 +++--- .../airavata/helix/impl/task/AiravataTask.java | 69 +++--- .../airavata/helix/impl/task/TaskContext.java | 22 +- .../airavata/helix/impl/task/env/EnvSetupTask.java | 12 - .../helix/impl/task/staging/DataStagingTask.java | 6 + .../impl/task/staging/InputDataStagingTask.java | 4 - .../impl/task/staging/OutputDataStagingTask.java | 37 +-- .../task/submission/DefaultJobSubmissionTask.java | 260 +++++++++------------ .../task/submission/ForkJobSubmissionTask.java | 78 ++++--- .../impl/task/submission/JobSubmissionTask.java | 38 ++- .../helix/impl/workflow/PostWorkflowManager.java | 29 +-- .../helix/impl/workflow/PreWorkflowManager.java | 4 +- .../src/main/resources/airavata-server.properties | 79 +------ .../helix/core/participant/HelixParticipant.java | 17 +- 16 files changed, 314 insertions(+), 472 deletions(-) diff --git a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/JobSubmissionOutput.java b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/JobSubmissionOutput.java index 1858826..e1d0a80 100644 --- a/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/JobSubmissionOutput.java +++ b/modules/airavata-helix/agent-api/src/main/java/org/apache/airavata/agents/api/JobSubmissionOutput.java @@ -8,6 +8,7 @@ public class JobSubmissionOutput { private String jobId; private boolean isJobSubmissionFailed; private String failureReason; + private String description; public int getExitCode() { return exitCode; @@ -71,4 +72,12 @@ public class JobSubmissionOutput { this.failureReason = failureReason; return this; } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } } diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java index 11d7129..f5e2137 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/controller/HelixController.java @@ -1,12 +1,11 @@ package org.apache.airavata.helix.impl.controller; -import org.apache.airavata.helix.core.util.PropertyResolver; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; import org.apache.helix.controller.HelixControllerMain; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.io.File; -import java.io.IOException; import java.util.concurrent.CountDownLatch; /** @@ -27,18 +26,11 @@ public class HelixController implements Runnable { private CountDownLatch startLatch = new CountDownLatch(1); private CountDownLatch stopLatch = new CountDownLatch(1); - public HelixController(String propertyFile, boolean readPropertyFromFile) throws IOException { - - PropertyResolver propertyResolver = new PropertyResolver(); - if (readPropertyFromFile) { - propertyResolver.loadFromFile(new File(propertyFile)); - } else { - propertyResolver.loadInputStream(this.getClass().getClassLoader().getResourceAsStream(propertyFile)); - } - - this.clusterName = propertyResolver.get("helix.cluster.name"); - this.controllerName = propertyResolver.get("helix.controller.name"); - this.zkAddress = propertyResolver.get("zookeeper.connection.url"); + @SuppressWarnings("WeakerAccess") + public HelixController() throws ApplicationSettingsException { + this.clusterName = ServerSettings.getSetting("helix.cluster.name"); + this.controllerName = ServerSettings.getSetting("helix.controller.name"); + this.zkAddress = ServerSettings.getZookeeperConnection(); } public void run() { @@ -64,12 +56,7 @@ public class HelixController implements Runnable { logger.info("Controller: " + controllerName + ", has connected to cluster: " + clusterName); Runtime.getRuntime().addShutdownHook( - new Thread() { - @Override - public void run() { - disconnect(); - } - } + new Thread(this::disconnect) ); } catch (InterruptedException ex) { @@ -77,6 +64,7 @@ public class HelixController implements Runnable { } } + @SuppressWarnings({"WeakerAccess", "unused"}) public void stop() { stopLatch.countDown(); } @@ -92,29 +80,11 @@ public class HelixController implements Runnable { try { logger.info("Starting helix controller"); - String confDir = null; - if (args != null) { - for (String arg : args) { - if (arg.startsWith("--confDir=")) { - confDir = arg.substring("--confDir=".length()); - } - } - } - - String propertiesFile = "application.properties"; - boolean readPropertyFromFile = false; - - if (confDir != null && !confDir.isEmpty()) { - propertiesFile = confDir.endsWith(File.separator)? confDir + propertiesFile : confDir + File.separator + propertiesFile; - readPropertyFromFile = true; - } - - logger.info("Using configuration file " + propertiesFile); - - HelixController helixController = new HelixController(propertiesFile, readPropertyFromFile); + + HelixController helixController = new HelixController(); helixController.start(); - } catch (IOException e) { + } catch (Exception e) { logger.error("Failed to start the helix controller", e); } } diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java index 7dd5c99..7c86f42 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java @@ -1,17 +1,14 @@ package org.apache.airavata.helix.impl.participant; +import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.helix.core.AbstractTask; import org.apache.airavata.helix.core.participant.HelixParticipant; import org.apache.airavata.helix.core.support.TaskHelperImpl; import org.apache.airavata.helix.task.api.annotation.TaskDef; -import org.apache.helix.task.Task; -import org.apache.helix.task.TaskCallbackContext; import org.apache.helix.task.TaskFactory; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.io.File; -import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -30,26 +27,24 @@ public class GlobalParticipant extends HelixParticipant { }; public Map<String, TaskFactory> getTaskFactory() { - Map<String, TaskFactory> taskRegistry = new HashMap<String, TaskFactory>(); + Map<String, TaskFactory> taskRegistry = new HashMap<>(); for (String taskClass : taskClasses) { - TaskFactory taskFac = new TaskFactory() { - public Task createNewTask(TaskCallbackContext context) { - try { - return AbstractTask.class.cast(Class.forName(taskClass).newInstance()) - .setCallbackContext(context) - .setTaskHelper(new TaskHelperImpl()); - } catch (InstantiationException | IllegalAccessException e) { - e.printStackTrace(); - return null; - } catch (ClassNotFoundException e) { - e.printStackTrace(); - return null; - } + TaskFactory taskFac = context -> { + try { + return AbstractTask.class.cast(Class.forName(taskClass).newInstance()) + .setCallbackContext(context) + .setTaskHelper(new TaskHelperImpl()); + } catch (InstantiationException | IllegalAccessException e) { + logger.error("Failed to initialize the task", e); + return null; + } catch (ClassNotFoundException e) { + logger.error("Task class can not be found in the class path", e); + return null; } }; - TaskDef taskDef = null; + TaskDef taskDef; try { taskDef = Class.forName(taskClass).getAnnotation(TaskDef.class); taskRegistry.put(taskDef.name(), taskFac); @@ -60,34 +55,23 @@ public class GlobalParticipant extends HelixParticipant { return taskRegistry; } - public GlobalParticipant(String propertyFile, Class taskClass, String taskTypeName, boolean readPropertyFromFile) throws IOException { - super(propertyFile, taskClass, taskTypeName, readPropertyFromFile); + @SuppressWarnings("WeakerAccess") + public GlobalParticipant(Class taskClass, String taskTypeName) throws ApplicationSettingsException { + super(taskClass, taskTypeName); } - public static void main(String args[]) throws IOException { + public static void main(String args[]) { + logger.info("Starting global participant"); - String confDir = null; - if (args != null) { - for (String arg : args) { - if (arg.startsWith("--confDir=")) { - confDir = arg.substring("--confDir=".length()); - } - } - } - - String propertiesFile = "application.properties"; - boolean readPropertyFromFile = false; - - if (confDir != null && !confDir.isEmpty()) { - propertiesFile = confDir.endsWith(File.separator)? confDir + propertiesFile : confDir + File.separator + propertiesFile; - readPropertyFromFile = true; + GlobalParticipant participant; + try { + participant = new GlobalParticipant(null, null); + Thread t = new Thread(participant); + t.start(); + } catch (Exception e) { + logger.error("Failed to start global participant", e); } - logger.info("Using configuration file " + propertiesFile); - - GlobalParticipant participant = new GlobalParticipant(propertiesFile, null, null, readPropertyFromFile); - Thread t = new Thread(participant); - t.start(); } } diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java index 289cfc5..4f6d6ec 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/AiravataTask.java @@ -53,6 +53,7 @@ public abstract class AiravataTask extends AbstractTask { private OutPort nextTask; protected TaskResult onSuccess(String message) { + publishTaskState(TaskState.COMPLETED); String successMessage = "Task " + getTaskId() + " completed." + (message != null ? " Message : " + message : ""); logger.info(successMessage); return nextTask.invoke(new TaskResult(TaskResult.Status.COMPLETED, message)); @@ -80,13 +81,15 @@ public abstract class AiravataTask extends AbstractTask { getTaskContext().setProcessStatus(status); ErrorModel errorModel = new ErrorModel(); - errorModel.setUserFriendlyMessage("GFac Worker throws an exception"); + errorModel.setUserFriendlyMessage(reason); errorModel.setActualErrorMessage(errors.toString()); errorModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime()); + publishTaskState(TaskState.FAILED); saveAndPublishProcessStatus(); saveExperimentError(errorModel); saveProcessError(errorModel); + saveTaskError(errorModel); return new TaskResult(fatal ? TaskResult.Status.FATAL_FAILED : TaskResult.Status.FAILED, errorMessage); } @@ -97,6 +100,7 @@ public abstract class AiravataTask extends AbstractTask { saveAndPublishProcessStatus(); } + @SuppressWarnings("WeakerAccess") protected void saveAndPublishProcessStatus() { try { ProcessStatus status = taskContext.getProcessStatus(); @@ -117,6 +121,7 @@ public abstract class AiravataTask extends AbstractTask { } } + @SuppressWarnings("WeakerAccess") protected void saveAndPublishTaskStatus() { try { TaskState state = getTaskContext().getTaskState(); @@ -140,6 +145,7 @@ public abstract class AiravataTask extends AbstractTask { } } + @SuppressWarnings("WeakerAccess") protected void saveExperimentError(ErrorModel errorModel) { try { errorModel.setErrorId(AiravataUtils.getId("EXP_ERROR")); @@ -150,6 +156,7 @@ public abstract class AiravataTask extends AbstractTask { } } + @SuppressWarnings("WeakerAccess") protected void saveProcessError(ErrorModel errorModel) { try { errorModel.setErrorId(AiravataUtils.getId("PROCESS_ERROR")); @@ -161,14 +168,15 @@ public abstract class AiravataTask extends AbstractTask { } } - protected void saveTaskError(ErrorModel errorModel) throws Exception { + @SuppressWarnings("WeakerAccess") + protected void saveTaskError(ErrorModel errorModel) { try { errorModel.setErrorId(AiravataUtils.getId("TASK_ERROR")); getExperimentCatalog().add(ExpCatChildDataType.TASK_ERROR, errorModel, getTaskId()); } catch (RegistryException e) { String msg = "expId: " + getExperimentId() + " processId: " + getProcessId() + " taskId: " + getTaskId() + " : - Error while updating task errors"; - throw new Exception(msg, e); + logger.error(msg, e); } } @@ -191,6 +199,7 @@ public abstract class AiravataTask extends AbstractTask { MDC.put("process", getProcessId()); MDC.put("gateway", getGatewayId()); MDC.put("task", getTaskId()); + publishTaskState(TaskState.EXECUTING); return onRun(helper, getTaskContext()); } finally { MDC.clear(); @@ -206,6 +215,7 @@ public abstract class AiravataTask extends AbstractTask { MDC.put("process", getProcessId()); MDC.put("gateway", getGatewayId()); MDC.put("task", getTaskId()); + publishTaskState(TaskState.CANCELED); onCancel(getTaskContext()); } finally { MDC.clear(); @@ -231,22 +241,21 @@ public abstract class AiravataTask extends AbstractTask { this.computeResourceDescription = getAppCatalog().getComputeResource().getComputeResource(getProcessModel() .getComputeResourceId()); - TaskContext.TaskContextBuilder taskContextBuilder = new TaskContext.TaskContextBuilder(getProcessId(), getGatewayId(), getTaskId()); - taskContextBuilder.setAppCatalog(getAppCatalog()); - taskContextBuilder.setExperimentCatalog(getExperimentCatalog()); - taskContextBuilder.setProcessModel(getProcessModel()); - taskContextBuilder.setStatusPublisher(getStatusPublisher()); - - taskContextBuilder.setGatewayResourceProfile(appCatalog.getGatewayProfile().getGatewayProfile(gatewayId)); - taskContextBuilder.setGatewayComputeResourcePreference( + TaskContext.TaskContextBuilder taskContextBuilder = new TaskContext.TaskContextBuilder(getProcessId(), getGatewayId(), getTaskId()) + .setAppCatalog(getAppCatalog()) + .setExperimentCatalog(getExperimentCatalog()) + .setProcessModel(getProcessModel()) + .setStatusPublisher(getStatusPublisher()) + .setGatewayResourceProfile(appCatalog.getGatewayProfile().getGatewayProfile(gatewayId)) + .setGatewayComputeResourcePreference( appCatalog.getGatewayProfile() - .getComputeResourcePreference(gatewayId, processModel.getComputeResourceId())); - taskContextBuilder.setGatewayStorageResourcePreference( + .getComputeResourcePreference(gatewayId, processModel.getComputeResourceId())) + .setGatewayStorageResourcePreference( appCatalog.getGatewayProfile() .getStoragePreference(gatewayId, processModel.getStorageResourceId())); this.taskContext = taskContextBuilder.build(); - logger.info("Task " + taskName + " intitialized"); + logger.info("Task " + taskName + " initialized"); } catch (Exception e) { logger.error("Error occurred while initializing the task " + getTaskId() + " of experiment " + getExperimentId(), e); throw new RuntimeException("Error occurred while initializing the task " + getTaskId() + " of experiment " + getExperimentId(), e); @@ -259,19 +268,25 @@ public abstract class AiravataTask extends AbstractTask { return appCatalog; } - protected void publishTaskState(TaskState ts) throws RegistryException { - - TaskStatus taskStatus = new TaskStatus(); - taskStatus.setState(ts); - taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - experimentCatalog.add(ExpCatChildDataType.TASK_STATUS, taskStatus, getTaskId()); - TaskIdentifier identifier = new TaskIdentifier(getTaskId(), - getProcessId(), getExperimentId(), getGatewayId()); - TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(ts, - identifier); - MessageContext msgCtx = new MessageContext(taskStatusChangeEvent, MessageType.TASK, AiravataUtils.getId - (MessageType.TASK.name()), getGatewayId()); - msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + @SuppressWarnings("WeakerAccess") + protected void publishTaskState(TaskState ts) { + + try { + TaskStatus taskStatus = new TaskStatus(); + taskStatus.setState(ts); + taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + experimentCatalog.add(ExpCatChildDataType.TASK_STATUS, taskStatus, getTaskId()); + TaskIdentifier identifier = new TaskIdentifier(getTaskId(), + getProcessId(), getExperimentId(), getGatewayId()); + TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(ts, + identifier); + MessageContext msgCtx = new MessageContext(taskStatusChangeEvent, MessageType.TASK, AiravataUtils.getId + (MessageType.TASK.name()), getGatewayId()); + msgCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + statusPublisher.publish(msgCtx); + } catch (Exception e) { + logger.error("Failed to publish task status " + (ts != null ? ts.name(): "null") +" of task " + getTaskId()); + } } protected ComputeResourceDescription getComputeResourceDescription() { diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java index 6be1d36..0e6a3cc 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java @@ -716,21 +716,11 @@ public class TaskContext { } } }else { - Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() { - @Override - public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) { - return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder(); - } - }); + jobSubmissionInterfaces.sort(Comparator.comparingInt(JobSubmissionInterface::getPriorityOrder)); } } interfaces = orderedInterfaces.get(preferredJobSubmissionProtocol); - Collections.sort(interfaces, new Comparator<JobSubmissionInterface>() { - @Override - public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) { - return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder(); - } - }); + interfaces.sort(Comparator.comparingInt(JobSubmissionInterface::getPriorityOrder)); } else { throw new AppCatalogException("Compute resource should have at least one job submission interface defined..."); } @@ -740,6 +730,7 @@ public class TaskContext { } } + @SuppressWarnings("WeakerAccess") public TaskModel getCurrentTaskModel() { return getTaskMap().get(taskId); } @@ -763,6 +754,7 @@ public class TaskContext { private StoragePreference gatewayStorageResourcePreference; private ProcessModel processModel; + @SuppressWarnings("WeakerAccess") public TaskContextBuilder(String processId, String gatewayId, String taskId) throws Exception { if (notValid(processId) || notValid(gatewayId) || notValid(taskId)) { throwError("Process Id, Gateway Id and Task Id must be not null"); @@ -826,9 +818,9 @@ public class TaskContext { if (notValid(experimentCatalog)) { throwError("Invalid Experiment catalog"); } - //if (notValid(statusPublisher)) { - // throwError("Invalid Status Publisher"); - //} + if (notValid(statusPublisher)) { + throwError("Invalid Status Publisher"); + } TaskContext ctx = new TaskContext(processId, gatewayId, taskId); ctx.setAppCatalog(appCatalog); diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java index 6eb1722..84adbcd 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java @@ -6,8 +6,6 @@ import org.apache.airavata.helix.impl.task.TaskContext; import org.apache.airavata.helix.task.api.TaskHelper; import org.apache.airavata.helix.task.api.annotation.TaskDef; import org.apache.airavata.model.status.ProcessState; -import org.apache.airavata.model.status.TaskState; -import org.apache.airavata.registry.cpi.RegistryException; import org.apache.helix.task.TaskResult; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -20,9 +18,7 @@ public class EnvSetupTask extends AiravataTask { @Override public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) { try { - saveAndPublishProcessStatus(ProcessState.CONFIGURING_WORKSPACE); - publishTaskState(TaskState.EXECUTING); AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor( getTaskContext().getGatewayId(), getTaskContext().getComputeResourceId(), @@ -32,17 +28,9 @@ public class EnvSetupTask extends AiravataTask { logger.info("Creating directory " + getTaskContext().getWorkingDir() + " on compute resource " + getTaskContext().getComputeResourceId()); adaptor.createDirectory(getTaskContext().getWorkingDir()); - publishTaskState(TaskState.COMPLETED); return onSuccess("Envi setup task successfully completed " + getTaskId()); } catch (Exception e) { - try { - publishTaskState(TaskState.FAILED); - } catch (RegistryException e1) { - logger.error("Task failed to publish task status", e1); - - // ignore silently - } return onFail("Failed to setup environment of task " + getTaskId(), true, e); } } diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java index 76b4cb3..3220064 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java @@ -15,8 +15,10 @@ import org.apache.commons.io.FileUtils; import java.io.File; import java.io.IOException; +@SuppressWarnings("WeakerAccess") public abstract class DataStagingTask extends AiravataTask { + @SuppressWarnings("WeakerAccess") protected DataStagingTaskModel getDataStagingTaskModel() throws TaskOnFailException { try { Object subTaskModel = getTaskContext().getSubTaskModel(); @@ -30,6 +32,7 @@ public abstract class DataStagingTask extends AiravataTask { } } + @SuppressWarnings("WeakerAccess") protected StorageResourceDescription getStorageResource() throws TaskOnFailException { try { StorageResourceDescription storageResource = getTaskContext().getStorageResource(); @@ -42,6 +45,7 @@ public abstract class DataStagingTask extends AiravataTask { } } + @SuppressWarnings("WeakerAccess") protected StorageResourceAdaptor getStorageAdaptor(AdaptorSupport adaptorSupport) throws TaskOnFailException { try { StorageResourceAdaptor storageResourceAdaptor = adaptorSupport.fetchStorageAdaptor( @@ -61,6 +65,7 @@ public abstract class DataStagingTask extends AiravataTask { } } + @SuppressWarnings("WeakerAccess") protected AgentAdaptor getComputeResourceAdaptor(AdaptorSupport adaptorSupport) throws TaskOnFailException { try { return adaptorSupport.fetchAdaptor( @@ -75,6 +80,7 @@ public abstract class DataStagingTask extends AiravataTask { } } + @SuppressWarnings("WeakerAccess") protected String getLocalDataPath(String fileName) throws TaskOnFailException { String localDataPath = ServerSettings.getLocalDataLocation(); localDataPath = (localDataPath.endsWith(File.separator) ? localDataPath : localDataPath + File.separator); diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java index de2aeac..f8d98cf 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java @@ -7,7 +7,6 @@ import org.apache.airavata.helix.impl.task.TaskContext; import org.apache.airavata.helix.impl.task.TaskOnFailException; import org.apache.airavata.helix.task.api.TaskHelper; import org.apache.airavata.helix.task.api.annotation.TaskDef; -import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription; import org.apache.airavata.model.application.io.InputDataObjectType; import org.apache.airavata.model.status.ProcessState; import org.apache.airavata.model.task.DataStagingTaskModel; @@ -49,9 +48,6 @@ public class InputDataStagingTask extends DataStagingTask { throw new TaskOnFailException(message, true, null); } - // Fetch and validate storage resource - StorageResourceDescription storageResource = getStorageResource(); - // Fetch and validate source and destination URLS URI sourceURI; URI destinationURI; diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java index 7d657cb..88698c0 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java @@ -20,7 +20,7 @@ import org.apache.log4j.Logger; import java.io.File; import java.net.URI; import java.net.URISyntaxException; -import java.util.Arrays; +import java.util.Collections; import java.util.List; @TaskDef(name = "Output Data Staging Task") @@ -65,6 +65,7 @@ public class OutputDataStagingTask extends DataStagingTask { sourceURI.getPath().length()); if (dataStagingTaskModel.getDestination().startsWith("dummy")) { + String inputPath = getTaskContext().getStorageFileSystemRootLocation(); inputPath = (inputPath.endsWith(File.separator) ? inputPath : inputPath + File.separator); String experimentDataDir = getProcessModel().getExperimentDataDir(); @@ -110,7 +111,7 @@ public class OutputDataStagingTask extends DataStagingTask { String sourceParentPath = (new File(sourceURI.getPath())).getParentFile().getPath(); logger.debug("Destination parent path " + destParentPath + ", source parent path " + sourceParentPath); - List<String> fileNames = null; + List<String> fileNames; try { fileNames = adaptor.getFileNameFromExtension(sourceFileName, sourceParentPath); @@ -133,11 +134,14 @@ public class OutputDataStagingTask extends DataStagingTask { } //Wildcard support is only enabled for output data staging + assert processOutput != null; processOutput.setName(sourceFileName); try { - getTaskContext().getExperimentCatalog().add(ExpCatChildDataType.EXPERIMENT_OUTPUT, Arrays.asList(processOutput), getExperimentId()); - getTaskContext().getExperimentCatalog().add(ExpCatChildDataType.PROCESS_OUTPUT, Arrays.asList(processOutput), getProcessId()); + getTaskContext().getExperimentCatalog().add(ExpCatChildDataType.EXPERIMENT_OUTPUT, + Collections.singletonList(processOutput), getExperimentId()); + getTaskContext().getExperimentCatalog().add(ExpCatChildDataType.PROCESS_OUTPUT, + Collections.singletonList(processOutput), getProcessId()); } catch (RegistryException e) { throw new TaskOnFailException("Failed to update experiment or process outputs for task " + getTaskId(), true, e); } @@ -145,11 +149,12 @@ public class OutputDataStagingTask extends DataStagingTask { logger.info("Transferring file " + sourceFileName); transferFile(sourceURI, destinationURI, sourceFileName, adaptor, storageResourceAdaptor); } + return onSuccess("Output data staging task " + getTaskId() + " successfully completed"); } else { // Downloading input file from the storage resource transferFile(sourceURI, destinationURI, sourceFileName, adaptor, storageResourceAdaptor); - return onSuccess("Input data staging task " + getTaskId() + " successfully completed"); + return onSuccess("Output data staging task " + getTaskId() + " successfully completed"); } } catch (TaskOnFailException e) { @@ -164,8 +169,6 @@ public class OutputDataStagingTask extends DataStagingTask { logger.error("Unknown error while executing output data staging task " + getTaskId(), e); return onFail("Unknown error while executing output data staging task " + getTaskId(), false, e); } - - return null; } private void transferFile(URI sourceURI, URI destinationURI, String fileName, AgentAdaptor adaptor, @@ -192,26 +195,6 @@ public class OutputDataStagingTask extends DataStagingTask { } } - public URI getDestinationURIFromDummy(String hostName, String inputPath, String fileName) throws URISyntaxException { - String experimentDataDir = getProcessModel().getExperimentDataDir(); - String filePath; - if(experimentDataDir != null && !experimentDataDir.isEmpty()) { - if(!experimentDataDir.endsWith(File.separator)){ - experimentDataDir += File.separator; - } - if (experimentDataDir.startsWith(File.separator)) { - filePath = experimentDataDir + fileName; - } else { - filePath = inputPath + experimentDataDir + fileName; - } - } else { - filePath = inputPath + getProcessId() + File.separator + fileName; - } - //FIXME - return new URI("file", getTaskContext().getStorageResourceLoginUserName(), hostName, 22, filePath, null, null); - - } - @Override public void onCancel(TaskContext taskContext) { diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java index 82316f0..6d64273 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java @@ -19,7 +19,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.List; @TaskDef(name = "Default Job Submission") @@ -27,7 +27,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask { private static final Logger logger = LogManager.getLogger(DefaultJobSubmissionTask.class); - public static final String DEFAULT_JOB_ID = "DEFAULT_JOB_ID"; + private static final String DEFAULT_JOB_ID = "DEFAULT_JOB_ID"; @Override public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) { @@ -45,171 +45,137 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask { jobModel.setJobName(mapData.getJobName()); jobModel.setJobDescription("Sample description"); - if (mapData != null) { - //jobModel.setJobDescription(FileUtils.readFileToString(jobFile)); - AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor( - getTaskContext().getGatewayId(), - getTaskContext().getComputeResourceId(), - getTaskContext().getJobSubmissionProtocol().name(), - getTaskContext().getComputeResourceCredentialToken(), - getTaskContext().getComputeResourceLoginUserName()); - - JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, mapData.getWorkingDirectory()); - - jobModel.setExitCode(submissionOutput.getExitCode()); - jobModel.setStdErr(submissionOutput.getStdErr()); - jobModel.setStdOut(submissionOutput.getStdOut()); - - String jobId = submissionOutput.getJobId(); - - if (submissionOutput.getExitCode() != 0 || submissionOutput.isJobSubmissionFailed()) { - jobModel.setJobId(DEFAULT_JOB_ID); - if (submissionOutput.isJobSubmissionFailed()) { - List<JobStatus> statusList = new ArrayList<>(); - statusList.add(new JobStatus(JobState.FAILED)); - statusList.get(0).setReason(submissionOutput.getFailureReason()); - jobModel.setJobStatuses(statusList); - saveJobModel(jobModel); - logger.error("expId: " + getExperimentId() + ", processid: " + getProcessId()+ ", taskId: " + - getTaskId() + " :- Job submission failed for job name " + jobModel.getJobName() - + ". Exit code : " + submissionOutput.getExitCode() + ", Submission failed : " - + submissionOutput.isJobSubmissionFailed()); + AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor( + getTaskContext().getGatewayId(), + getTaskContext().getComputeResourceId(), + getTaskContext().getJobSubmissionProtocol().name(), + getTaskContext().getComputeResourceCredentialToken(), + getTaskContext().getComputeResourceLoginUserName()); - ErrorModel errorModel = new ErrorModel(); - errorModel.setUserFriendlyMessage(submissionOutput.getFailureReason()); - errorModel.setActualErrorMessage(submissionOutput.getFailureReason()); - saveExperimentError(errorModel); - saveProcessError(errorModel); - saveTaskError(errorModel); - //taskStatus.setState(TaskState.FAILED); - //taskStatus.setReason("Job submission command didn't return a jobId"); - //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - //taskContext.setTaskStatus(taskStatus); - logger.error("Standard error message : " + submissionOutput.getStdErr()); - logger.error("Standard out message : " + submissionOutput.getStdOut()); - return onFail("Job submission command didn't return a jobId", false, null); + JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, mapData.getWorkingDirectory()); + + jobModel.setJobDescription(submissionOutput.getDescription()); + jobModel.setExitCode(submissionOutput.getExitCode()); + jobModel.setStdErr(submissionOutput.getStdErr()); + jobModel.setStdOut(submissionOutput.getStdOut()); + + String jobId = submissionOutput.getJobId(); + + if (submissionOutput.getExitCode() != 0 || submissionOutput.isJobSubmissionFailed()) { + + jobModel.setJobId(DEFAULT_JOB_ID); + if (submissionOutput.isJobSubmissionFailed()) { + List<JobStatus> statusList = new ArrayList<>(); + statusList.add(new JobStatus(JobState.FAILED)); + statusList.get(0).setReason(submissionOutput.getFailureReason()); + jobModel.setJobStatuses(statusList); + saveJobModel(jobModel); + logger.error("expId: " + getExperimentId() + ", processid: " + getProcessId()+ ", taskId: " + + getTaskId() + " :- Job submission failed for job name " + jobModel.getJobName() + + ". Exit code : " + submissionOutput.getExitCode() + ", Submission failed : " + + submissionOutput.isJobSubmissionFailed()); + + logger.error("Standard error message : " + submissionOutput.getStdErr()); + logger.error("Standard out message : " + submissionOutput.getStdOut()); + return onFail("Job submission command didn't return a jobId. Reason " + submissionOutput.getFailureReason(), + false, null); + + } else { + String msg; + saveJobModel(jobModel); + ErrorModel errorModel = new ErrorModel(); + if (submissionOutput.getExitCode() != Integer.MIN_VALUE) { + msg = "expId:" + getExperimentId() + ", processId:" + getProcessId() + ", taskId: " + getTaskId() + + " return non zero exit code:" + submissionOutput.getExitCode() + " for JobName:" + jobModel.getJobName() + + ", with failure reason : " + submissionOutput.getFailureReason() + + " Hence changing job state to Failed." ; + errorModel.setActualErrorMessage(submissionOutput.getFailureReason()); } else { - String msg; - saveJobModel(jobModel); - ErrorModel errorModel = new ErrorModel(); - if (submissionOutput.getExitCode() != Integer.MIN_VALUE) { - msg = "expId:" + getExperimentId() + ", processId:" + getProcessId() + ", taskId: " + getTaskId() + - " return non zero exit code:" + submissionOutput.getExitCode() + " for JobName:" + jobModel.getJobName() + - ", with failure reason : " + submissionOutput.getFailureReason() - + " Hence changing job state to Failed." ; - errorModel.setActualErrorMessage(submissionOutput.getFailureReason()); - } else { - msg = "expId:" + getExperimentId() + ", processId:" + getProcessId() + ", taskId: " + getTaskId() + - " doesn't return valid job submission exit code for JobName:" + jobModel.getJobName() + - ", with failure reason : stdout ->" + submissionOutput.getStdOut() + - " stderr -> " + submissionOutput.getStdErr() + " Hence changing job state to Failed." ; - errorModel.setActualErrorMessage(msg); - } - logger.error(msg); - errorModel.setUserFriendlyMessage(msg); - saveExperimentError(errorModel); - saveProcessError(errorModel); - saveTaskError(errorModel); - //taskStatus.setState(TaskState.FAILED); - //taskStatus.setReason(msg); - //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - //taskContext.setTaskStatus(taskStatus); - return onFail(msg, false, null); + msg = "expId:" + getExperimentId() + ", processId:" + getProcessId() + ", taskId: " + getTaskId() + + " doesn't return valid job submission exit code for JobName:" + jobModel.getJobName() + + ", with failure reason : stdout ->" + submissionOutput.getStdOut() + + " stderr -> " + submissionOutput.getStdErr() + " Hence changing job state to Failed." ; + errorModel.setActualErrorMessage(msg); } + logger.error(msg); + return onFail(msg, false, null); - //TODO save task status?? - } else if (jobId != null && !jobId.isEmpty()) { - logger.info("Received job id " + jobId + " from compute resource"); - jobModel.setJobId(jobId); - saveJobModel(jobModel); + } + + } else if (jobId != null && !jobId.isEmpty()) { + + logger.info("Received job id " + jobId + " from compute resource"); + jobModel.setJobId(jobId); + saveJobModel(jobModel); + + JobStatus jobStatus = new JobStatus(); + jobStatus.setJobState(JobState.SUBMITTED); + jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName()); + jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + jobModel.setJobStatuses(Collections.singletonList(jobStatus)); + saveAndPublishJobStatus(jobModel); - JobStatus jobStatus = new JobStatus(); - jobStatus.setJobState(JobState.SUBMITTED); - jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName()); + if (verifyJobSubmissionByJobId(adaptor, jobId)) { + jobStatus.setJobState(JobState.QUEUED); + jobStatus.setReason("Verification step succeeded"); jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - jobModel.setJobStatuses(Arrays.asList(jobStatus)); + jobModel.setJobStatuses(Collections.singletonList(jobStatus)); saveAndPublishJobStatus(jobModel); + createMonitoringNode(jobId); + } + + if (getComputeResourceDescription().isGatewayUsageReporting()){ + String loadCommand = getComputeResourceDescription().getGatewayUsageModuleLoadCommand(); + String usageExecutable = getComputeResourceDescription().getGatewayUsageExecutable(); + ExperimentModel experiment = (ExperimentModel)getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, getExperimentId()); + String username = experiment.getUserName() + "@" + getTaskContext().getGatewayComputeResourcePreference().getUsageReportingGatewayId(); + RawCommandInfo rawCommandInfo = new RawCommandInfo(loadCommand + " && " + usageExecutable + " -gateway_user " + username + + " -submit_time \"`date '+%F %T %:z'`\" -jobid " + jobId ); + adaptor.executeCommand(rawCommandInfo.getRawCommand(), null); + } + + return onSuccess("Submitted job to compute resource"); + + } else { - if (verifyJobSubmissionByJobId(adaptor, jobId)) { + int verificationTryCount = 0; + while (verificationTryCount++ < 3) { + String verifyJobId = verifyJobSubmission(adaptor, jobModel.getJobName(), getTaskContext().getComputeResourceLoginUserName()); + if (verifyJobId != null && !verifyJobId.isEmpty()) { + // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED + jobId = verifyJobId; + jobModel.setJobId(jobId); + saveJobModel(jobModel); + JobStatus jobStatus = new JobStatus(); jobStatus.setJobState(JobState.QUEUED); jobStatus.setReason("Verification step succeeded"); jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - jobModel.setJobStatuses(Arrays.asList(jobStatus)); + jobModel.setJobStatuses(Collections.singletonList(jobStatus)); saveAndPublishJobStatus(jobModel); - createMonitoringNode(jobId); - } - - if (getComputeResourceDescription().isGatewayUsageReporting()){ - String loadCommand = getComputeResourceDescription().getGatewayUsageModuleLoadCommand(); - String usageExecutable = getComputeResourceDescription().getGatewayUsageExecutable(); - ExperimentModel experiment = (ExperimentModel)getExperimentCatalog().get(ExperimentCatalogModelType.EXPERIMENT, getExperimentId()); - String username = experiment.getUserName() + "@" + getTaskContext().getGatewayComputeResourcePreference().getUsageReportingGatewayId(); - RawCommandInfo rawCommandInfo = new RawCommandInfo(loadCommand + " && " + usageExecutable + " -gateway_user " + username + - " -submit_time \"`date '+%F %T %:z'`\" -jobid " + jobId ); - adaptor.executeCommand(rawCommandInfo.getRawCommand(), null); - } - //taskStatus = new TaskStatus(TaskState.COMPLETED); - //taskStatus.setReason("Submitted job to compute resource"); - //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - - return onSuccess("Submitted job to compute resource"); - } else { - int verificationTryCount = 0; - while (verificationTryCount++ < 3) { - String verifyJobId = verifyJobSubmission(adaptor, jobModel.getJobName(), getTaskContext().getComputeResourceLoginUserName()); - if (verifyJobId != null && !verifyJobId.isEmpty()) { - // JobStatus either changed from SUBMITTED to QUEUED or directly to QUEUED - jobId = verifyJobId; - jobModel.setJobId(jobId); - saveJobModel(jobModel); - JobStatus jobStatus = new JobStatus(); - jobStatus.setJobState(JobState.QUEUED); - jobStatus.setReason("Verification step succeeded"); - jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - jobModel.setJobStatuses(Arrays.asList(jobStatus)); - saveAndPublishJobStatus(jobModel); - //taskStatus.setState(TaskState.COMPLETED); - //taskStatus.setReason("Submitted job to compute resource"); - //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - break; - } - logger.info("Verify step return invalid jobId, retry verification step in " + (verificationTryCount * 10) + " secs"); - Thread.sleep(verificationTryCount * 10000); + logger.info("Job id " + verifyJobId + " verification succeeded"); + break; } + logger.info("Verify step return invalid jobId, retry verification step in " + (verificationTryCount * 10) + " secs"); + Thread.sleep(verificationTryCount * 10000); } + } - if (jobId == null || jobId.isEmpty()) { - jobModel.setJobId(DEFAULT_JOB_ID); - saveJobModel(jobModel); - String msg = "expId:" + getExperimentId() + " Couldn't find " + - "remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " + - "doesn't return a valid JobId. " + "Hence changing experiment state to Failed"; - logger.error(msg); - ErrorModel errorModel = new ErrorModel(); - errorModel.setUserFriendlyMessage(msg); - errorModel.setActualErrorMessage(msg); - saveExperimentError(errorModel); - saveProcessError(errorModel); - saveTaskError(errorModel); - //taskStatus.setState(TaskState.FAILED); - //taskStatus.setReason("Couldn't find job id in both submitted and verified steps"); - //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - return onFail("Couldn't find job id in both submitted and verified steps", false, null); - } else { - //GFacUtils.saveJobModel(processContext, jobModel); - } - - } else { - return onFail("Job data is null", true, null); - // taskStatus.setReason("JobFile is null"); - //taskStatus.setState(TaskState.FAILED); + if (jobId == null || jobId.isEmpty()) { + jobModel.setJobId(DEFAULT_JOB_ID); + saveJobModel(jobModel); + String msg = "expId:" + getExperimentId() + " Couldn't find " + + "remote jobId for JobName:" + jobModel.getJobName() + ", both submit and verify steps " + + "doesn't return a valid JobId. " + "Hence changing experiment state to Failed"; + logger.error(msg); + return onFail("Couldn't find job id in both submitted and verified steps. " + msg, false, null); + } else { + return onSuccess("Submitted job to compute resource after retry"); } + } catch (Exception e) { return onFail("Task failed due to unexpected issue", false, e); } - // TODO get rid of this - return onFail("Task moved to an unknown state", false, null); } private boolean verifyJobSubmissionByJobId(AgentAdaptor agentAdaptor, String jobID) throws Exception { diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/ForkJobSubmissionTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/ForkJobSubmissionTask.java index 06ce0ea..d9415ac 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/ForkJobSubmissionTask.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/ForkJobSubmissionTask.java @@ -12,12 +12,17 @@ import org.apache.airavata.model.job.JobModel; import org.apache.airavata.model.status.JobState; import org.apache.airavata.model.status.JobStatus; import org.apache.helix.task.TaskResult; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; -import java.util.Arrays; +import java.util.Collections; @TaskDef(name = "Fork Job Submission") +@SuppressWarnings("unused") public class ForkJobSubmissionTask extends JobSubmissionTask { + private static final Logger logger = LogManager.getLogger(ForkJobSubmissionTask.class); + @Override public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) { @@ -31,45 +36,44 @@ public class ForkJobSubmissionTask extends JobSubmissionTask { jobModel.setTaskId(getTaskId()); jobModel.setJobName(mapData.getJobName()); - if (mapData != null) { - //jobModel.setJobDescription(FileUtils.readFileToString(jobFile)); - AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor( - getTaskContext().getGatewayId(), - getTaskContext().getComputeResourceId(), - getTaskContext().getJobSubmissionProtocol().name(), - getTaskContext().getComputeResourceCredentialToken(), - getTaskContext().getComputeResourceLoginUserName()); - - JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, mapData.getWorkingDirectory()); - - jobModel.setExitCode(submissionOutput.getExitCode()); - jobModel.setStdErr(submissionOutput.getStdErr()); - jobModel.setStdOut(submissionOutput.getStdOut()); - - String jobId = submissionOutput.getJobId(); - - if (jobId != null && !jobId.isEmpty()) { - jobModel.setJobId(jobId); - saveJobModel(jobModel); - JobStatus jobStatus = new JobStatus(); - jobStatus.setJobState(JobState.SUBMITTED); - jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName()); - jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - jobModel.setJobStatuses(Arrays.asList(jobStatus)); - saveAndPublishJobStatus(jobModel); - - return null; - } else { - String msg = "expId:" + getExperimentId() + " Couldn't find remote jobId for JobName:" + - jobModel.getJobName() + ", both submit and verify steps doesn't return a valid JobId. " + - "Hence changing experiment state to Failed"; - } - + AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor( + getTaskContext().getGatewayId(), + getTaskContext().getComputeResourceId(), + getTaskContext().getJobSubmissionProtocol().name(), + getTaskContext().getComputeResourceCredentialToken(), + getTaskContext().getComputeResourceLoginUserName()); + + JobSubmissionOutput submissionOutput = submitBatchJob(adaptor, mapData, mapData.getWorkingDirectory()); + + jobModel.setJobDescription(submissionOutput.getDescription()); + jobModel.setExitCode(submissionOutput.getExitCode()); + jobModel.setStdErr(submissionOutput.getStdErr()); + jobModel.setStdOut(submissionOutput.getStdOut()); + + String jobId = submissionOutput.getJobId(); + + if (jobId != null && !jobId.isEmpty()) { + jobModel.setJobId(jobId); + saveJobModel(jobModel); + JobStatus jobStatus = new JobStatus(); + jobStatus.setJobState(JobState.SUBMITTED); + jobStatus.setReason("Successfully Submitted to " + getComputeResourceDescription().getHostName()); + jobStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + jobModel.setJobStatuses(Collections.singletonList(jobStatus)); + saveAndPublishJobStatus(jobModel); + + return onSuccess("Job submitted successfully"); + } else { + String msg = "expId:" + getExperimentId() + " Couldn't find remote jobId for JobName:" + + jobModel.getJobName() + ", both submit and verify steps doesn't return a valid JobId. " + + "Hence changing experiment state to Failed"; + + return onFail(msg, true, null); } - return null; } catch (Exception e) { - return null; + logger.error("Unknown error while submitting job", e); + return onFail("Unknown error while submitting job", true, e); } } diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java index 7bf5034..a204ee1 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java @@ -52,23 +52,32 @@ public abstract class JobSubmissionTask extends AiravataTask { } } + @SuppressWarnings("WeakerAccess") public CuratorFramework getCuratorClient() { return curatorClient; } // TODO perform exception handling + @SuppressWarnings("WeakerAccess") protected void createMonitoringNode(String jobId) throws Exception { logger.info("Creating zookeeper paths for job monitoring for job id : " + jobId + ", process : " + getProcessId() + ", gateway : " + getGatewayId()); - this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/lock", new byte[0]); - this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/gateway", getGatewayId().getBytes()); - this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/process", getProcessId().getBytes()); - this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/task", getTaskId().getBytes()); - this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/experiment", getExperimentId().getBytes()); - this.curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/monitoring/" + jobId + "/status", "pending".getBytes()); + getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath( + "/monitoring/" + jobId + "/lock", new byte[0]); + getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath( + "/monitoring/" + jobId + "/gateway", getGatewayId().getBytes()); + getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath( + "/monitoring/" + jobId + "/process", getProcessId().getBytes()); + getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath( + "/monitoring/" + jobId + "/task", getTaskId().getBytes()); + getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath( + "/monitoring/" + jobId + "/experiment", getExperimentId().getBytes()); + getCuratorClient().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath( + "/monitoring/" + jobId + "/status", "pending".getBytes()); } ////////////////////// + @SuppressWarnings("WeakerAccess") protected JobSubmissionOutput submitBatchJob(AgentAdaptor agentAdaptor, GroovyMapData groovyMapData, String workingDirectory) throws Exception { JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager( getAppCatalog(), getTaskContext().getJobSubmissionProtocol(), getTaskContext().getPreferredJobSubmissionInterface())); @@ -94,6 +103,7 @@ public abstract class JobSubmissionTask extends AiravataTask { CommandOutput commandOutput = agentAdaptor.executeCommand(submitCommand.getRawCommand(), workingDirectory); JobSubmissionOutput jsoutput = new JobSubmissionOutput(); + jsoutput.setDescription(scriptAsString); jsoutput.setJobId(jobManagerConfiguration.getParser().parseJobSubmission(commandOutput.getStdOut())); if (jsoutput.getJobId() == null) { @@ -114,12 +124,14 @@ public abstract class JobSubmissionTask extends AiravataTask { return jsoutput; } + @SuppressWarnings("WeakerAccess") public File getLocalDataDir() { String outputPath = ServerSettings.getLocalDataLocation(); outputPath = (outputPath.endsWith(File.separator) ? outputPath : outputPath + File.separator); return new File(outputPath + getProcessId()); } + @SuppressWarnings("WeakerAccess") public JobStatus getJobStatus(AgentAdaptor agentAdaptor, String jobID) throws Exception { JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager( getAppCatalog(), getTaskContext().getJobSubmissionProtocol(), getTaskContext().getPreferredJobSubmissionInterface())); @@ -129,6 +141,7 @@ public abstract class JobSubmissionTask extends AiravataTask { } + @SuppressWarnings("WeakerAccess") public String getJobIdByJobName(AgentAdaptor agentAdaptor, String jobName, String userName) throws Exception { JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager( getAppCatalog(), getTaskContext().getJobSubmissionProtocol(), getTaskContext().getPreferredJobSubmissionInterface())); @@ -138,16 +151,22 @@ public abstract class JobSubmissionTask extends AiravataTask { return jobManagerConfiguration.getParser().parseJobId(jobName, commandOutput.getStdOut()); } + @SuppressWarnings("WeakerAccess") public void saveJobModel(JobModel jobModel) throws RegistryException { getExperimentCatalog().add(ExpCatChildDataType.JOB, jobModel, getProcessId()); } + @SuppressWarnings("WeakerAccess") public void saveAndPublishJobStatus(JobModel jobModel) throws Exception { try { // first we save job jobModel to the registry for sa and then save the job status. - JobStatus jobStatus = null; - if(jobModel.getJobStatuses() != null) + JobStatus jobStatus; + if (jobModel.getJobStatuses() != null && jobModel.getJobStatuses().size() > 0) { jobStatus = jobModel.getJobStatuses().get(0); + } else { + logger.error("Job statuses can not be empty"); + return; + } List<JobStatus> statuses = new ArrayList<>(); statuses.add(jobStatus); @@ -173,7 +192,4 @@ public abstract class JobSubmissionTask extends AiravataTask { throw new Exception("Error persisting job status " + e.getLocalizedMessage(), e); } } - - ///////////// required for groovy map - } diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java index 225f81d..e2af339 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java @@ -41,15 +41,11 @@ import org.apache.log4j.Logger; import org.apache.zookeeper.data.Stat; import java.util.*; -import java.util.stream.Collectors; public class PostWorkflowManager { private static final Logger logger = LogManager.getLogger(PostWorkflowManager.class); - //private final String BOOTSTRAP_SERVERS = "localhost:9092"; - //private final String TOPIC = "parsed-data"; - private CuratorFramework curatorClient = null; private Publisher statusPublisher; @@ -66,7 +62,7 @@ public class PostWorkflowManager { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JobStatusResultDeserializer.class.getName()); // Create the consumer using props. - final Consumer<String, JobStatusResult> consumer = new KafkaConsumer<String, JobStatusResult>(props); + final Consumer<String, JobStatusResult> consumer = new KafkaConsumer<>(props); // Subscribe to the topic. consumer.subscribe(Collections.singletonList(ServerSettings.getSetting("kafka.broker.topic"))); return consumer; @@ -74,32 +70,27 @@ public class PostWorkflowManager { private String getExperimentIdByJobId(String jobId) throws Exception { byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/experiment"); - String process = new String(processBytes); - return process; + return new String(processBytes); } private String getTaskIdByJobId(String jobId) throws Exception { byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/task"); - String process = new String(processBytes); - return process; + return new String(processBytes); } private String getProcessIdByJobId(String jobId) throws Exception { byte[] processBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/process"); - String process = new String(processBytes); - return process; + return new String(processBytes); } private String getGatewayByJobId(String jobId) throws Exception { byte[] gatewayBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/gateway"); - String gateway = new String(gatewayBytes); - return gateway; + return new String(gatewayBytes); } private String getStatusByJobId(String jobId) throws Exception { byte[] statusBytes = this.curatorClient.getData().forPath("/monitoring/" + jobId + "/status"); - String status = new String(statusBytes); - return status; + return new String(statusBytes); } private boolean hasMonitoringRegistered(String jobId) throws Exception { @@ -128,7 +119,7 @@ public class PostWorkflowManager { // TODO get cluster lock before that if ("cancelled".equals(status)) { - + // TODO to be implemented } else { saveAndPublishJobStatus(jobStatusResult.getJobId(), task, processId, experimentId, gateway, jobStatusResult.getState()); @@ -190,7 +181,7 @@ public class PostWorkflowManager { ServerSettings.getZookeeperConnection()); workflowManager.launchWorkflow(processId + "-POST-" + UUID.randomUUID().toString(), - allTasks.stream().map(t -> (AiravataTask) t).collect(Collectors.toList()), true, false); + new ArrayList<>(allTasks), true, false); } else if (jobStatusResult.getState() == JobState.CANCELED) { logger.info("Job " + jobStatusResult.getJobId() + " was externally cancelled"); @@ -224,7 +215,7 @@ public class PostWorkflowManager { } } - public void saveAndPublishJobStatus(String jobId, String taskId, String processId, String experimentId, String gateway, + private void saveAndPublishJobStatus(String jobId, String taskId, String processId, String experimentId, String gateway, JobState jobState) throws Exception { try { @@ -255,7 +246,7 @@ public class PostWorkflowManager { } } - public Publisher getStatusPublisher() throws AiravataException { + private Publisher getStatusPublisher() throws AiravataException { if (statusPublisher == null) { synchronized (RabbitMQPublisher.class) { if (statusPublisher == null) { diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java index 18a6627..383e0b0 100644 --- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java +++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java @@ -3,6 +3,7 @@ package org.apache.airavata.helix.impl.workflow; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.helix.core.AbstractTask; import org.apache.airavata.helix.core.OutPort; import org.apache.airavata.helix.impl.task.AiravataTask; import org.apache.airavata.helix.impl.task.env.EnvSetupTask; @@ -36,6 +37,7 @@ public class PreWorkflowManager { private final Subscriber subscriber; + @SuppressWarnings("WeakerAccess") public PreWorkflowManager() throws AiravataException { List<String> routingKeys = new ArrayList<>(); routingKeys.add(ServerSettings.getRabbitmqProcessExchangeName()); @@ -94,7 +96,7 @@ public class PreWorkflowManager { ServerSettings.getSetting("post.workflow.manager.name"), ServerSettings.getZookeeperConnection()); String workflowName = workflowManager.launchWorkflow(processId + "-PRE-" + UUID.randomUUID().toString(), - allTasks.stream().map(t -> (AiravataTask) t).collect(Collectors.toList()), true, false); + new ArrayList<>(allTasks), true, false); return workflowName; } diff --git a/modules/airavata-helix/helix-spectator/src/main/resources/airavata-server.properties b/modules/airavata-helix/helix-spectator/src/main/resources/airavata-server.properties index 19b3b3d..e412896 100644 --- a/modules/airavata-helix/helix-spectator/src/main/resources/airavata-server.properties +++ b/modules/airavata-helix/helix-spectator/src/main/resources/airavata-server.properties @@ -45,18 +45,6 @@ jpa.cache.size=-1 #jpa.connection.properties=MaxActive=10,MaxIdle=5,MinIdle=2,MaxWait=60000,testWhileIdle=true,testOnBorrow=true enable.sharing=true -# Properties for default user mode -default.registry.user=default-admin -default.registry.password=123456 -default.registry.password.hash.method=SHA -default.registry.gateway=default -super.tenant.gatewayId=default - -# Properties for cluster status monitoring -# cluster status monitoring job repeat time in seconds -cluster.status.monitoring.enable=false -cluster.status.monitoring.repeat.time=18000 - ########################################################################### # Application Catalog DB Configuration ########################################################################### @@ -84,19 +72,6 @@ replicacatalog.jdbc.password=eroma123456 replicacatalog.validationQuery=SELECT 1 from CONFIGURATION ########################################################################### -# Workflow Catalog DB Configuration -########################################################################### -#for derby [AiravataJPARegistry] -#workflowcatalog.jdbc.driver=org.apache.derby.jdbc.ClientDriver -#workflowcatalog.jdbc.url=jdbc:derby://localhost:1527/workflow_catalog;create=true;user=airavata;password=airavata -# MariaDB database configuration -workflowcatalog.jdbc.driver=org.mariadb.jdbc.Driver -workflowcatalog.jdbc.url=jdbc:mariadb://149.165.168.248:3306/replica_catalog -workflowcatalog.jdbc.user=eroma -workflowcatalog.jdbc.password=eroma123456 -workflowcatalog.validationQuery=SELECT 1 from CONFIGURATION - -########################################################################### # Sharing Catalog DB Configuration ########################################################################### #for derby [AiravataJPARegistry] @@ -117,21 +92,6 @@ sharing.registry.server.host=192.168.99.102 sharing.registry.server.port=7878 ########################################################################### -# User Profile MongoDB Configuration -########################################################################### -userprofile.mongodb.host=localhost -userprofile.mongodb.port=27017 - - -########################################################################### -# Server module Configuration -########################################################################### -#credential store server should be started before API server -#This is obsolete property with new script files. -#servers=credentialstore,apiserver,orchestrator - - -########################################################################### # API Server Configurations ########################################################################### apiserver=org.apache.airavata.api.server.AiravataAPIServer @@ -141,21 +101,6 @@ apiserver.port=8930 apiserver.min.threads=50 ########################################################################### -# Orchestrator Server Configurations -########################################################################### -orchestrator=org.apache.airavata.orchestrator.server.OrchestratorServer -orchestrator.server.name=orchestrator-node0 -orchestrator.server.host=192.168.99.102 -orchestrator.server.port=8940 -orchestrator.server.min.threads=50 -job.validators=org.apache.airavata.orchestrator.core.validator.impl.BatchQueueValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator -submitter.interval=10000 -threadpool.size=10 -start.submitter=true -embedded.mode=true -enable.validation=true - -########################################################################### # Registry Server Configurations ########################################################################### regserver=org.apache.airavata.registry.api.service.RegistryAPIServer @@ -164,28 +109,6 @@ regserver.server.host=192.168.99.102 regserver.server.port=8970 regserver.server.min.threads=50 -########################################################################### -# GFac Server Configurations -########################################################################### -gfac=org.apache.airavata.gfac.server.GfacServer -gfac.server.name=gfac-node0 -gfac.server.host=10.0.2.15 -gfac.server.port=8950 -gfac.thread.pool.size=50 -host.scheduler=org.apache.airavata.gfac.impl.DefaultHostScheduler - - - -########################################################################### -# Airavata Workflow Interpreter Configurations -########################################################################### -workflowserver=org.apache.airavata.api.server.WorkflowServer -enactment.thread.pool.size=10 - -#to define custom workflow parser user following property -#workflow.parser=org.apache.airavata.workflow.core.parser.AiravataWorkflowBuilder - - ########################################################################### # Job Scheduler can send informative email messages to you about the status of your job. @@ -269,6 +192,8 @@ kafka.broker.consumer.group=MonitoringConsumer helix.cluster.name=AiravataDemoCluster pre.workflow.manager.name=prewm post.workflow.manager.name=postwm +helix.controller.name=helixcontroller +helix.participant.name=helixparticipant ########################################################################### # AMQP Notification Configuration diff --git a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java index a7e5a64..029da29 100644 --- a/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java +++ b/modules/airavata-helix/task-core/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java @@ -1,5 +1,7 @@ package org.apache.airavata.helix.core.participant; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.helix.core.support.TaskHelperImpl; import org.apache.airavata.helix.core.AbstractTask; import org.apache.airavata.helix.core.util.PropertyResolver; @@ -44,20 +46,13 @@ public class HelixParticipant <T extends AbstractTask> implements Runnable { private PropertyResolver propertyResolver; private Class<T> taskClass; - public HelixParticipant(String propertyFile, Class<T> taskClass, String taskTypeName, boolean readPropertyFromFile) throws IOException { + public HelixParticipant(Class<T> taskClass, String taskTypeName) throws ApplicationSettingsException { logger.info("Initializing Participant Node"); - this.propertyResolver = new PropertyResolver(); - if (readPropertyFromFile) { - propertyResolver.loadFromFile(new File(propertyFile)); - } else { - propertyResolver.loadInputStream(this.getClass().getClassLoader().getResourceAsStream(propertyFile)); - } - - this.zkAddress = propertyResolver.get("zookeeper.connection.url"); - this.clusterName = propertyResolver.get("helix.cluster.name"); - this.participantName = propertyResolver.get("participant.name"); + this.zkAddress = ServerSettings.getZookeeperConnection(); + this.clusterName = ServerSettings.getSetting("helix.cluster.name"); + this.participantName = ServerSettings.getSetting("helix.participant.name"); this.taskTypeName = taskTypeName; this.taskClass = taskClass; -- To stop receiving notification emails like this one, please contact dimuthu...@apache.org.