This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch helix-integration in repository https://gitbox.apache.org/repos/asf/airavata.git
commit ca455645a09da0c8184a741c7ee5cb3853338d70 Author: dimuthu <dimuthu.upeks...@gmail.com> AuthorDate: Sun Mar 4 20:48:23 2018 -0500 Configuring pre workflow manager to read from rabbitmq launch queue --- .../airavata/helix/workflow/WorkflowManager.java | 22 ++++-- modules/helix-spectator/pom.xml | 5 ++ .../submission/task/DefaultJobSubmissionTask.java | 2 +- .../helix/impl/workflow/PostWorkflowManager.java | 18 ++--- .../helix/impl/workflow/PreWorkflowManager.java | 92 ++++++++++++++++++---- 5 files changed, 102 insertions(+), 37 deletions(-) diff --git a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java index 9ecafb9..e3d07b7 100644 --- a/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java +++ b/modules/airavata-helix/workflow-impl/src/main/java/org/apache/airavata/helix/workflow/WorkflowManager.java @@ -2,13 +2,14 @@ package org.apache.airavata.helix.workflow; import org.apache.airavata.helix.core.AbstractTask; import org.apache.airavata.helix.core.OutPort; -import org.apache.airavata.helix.core.util.*; import org.apache.airavata.helix.core.util.TaskUtil; import org.apache.airavata.helix.task.api.annotation.TaskDef; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; import org.apache.helix.task.*; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.List; @@ -22,6 +23,8 @@ import java.util.Map; */ public class WorkflowManager { + private static final Logger logger = LogManager.getLogger(WorkflowManager.class); + private static final String WORKFLOW_PREFIX = "Workflow_of_process_"; private TaskDriver taskDriver; @@ -43,9 +46,12 @@ public class WorkflowManager { taskDriver = new TaskDriver(helixManager); } - public void launchWorkflow(String processId, List<AbstractTask> tasks, boolean globalParticipant) throws Exception { + public String launchWorkflow(String processId, List<AbstractTask> tasks, boolean globalParticipant, boolean monitor) throws Exception { + + String workflowName = WORKFLOW_PREFIX + processId; + logger.info("Launching workflow " + workflowName + " for process " + processId); - Workflow.Builder workflowBuilder = new Workflow.Builder(WORKFLOW_PREFIX + processId).setExpiry(0); + Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName).setExpiry(0); for (int i = 0; i < tasks.size(); i++) { AbstractTask data = tasks.get(i); @@ -86,9 +92,13 @@ public class WorkflowManager { //TODO : Do we need to monitor workflow status? If so how do we do it in a scalable manner? For example, // if the hfac that monitors a particular workflow, got killed due to some reason, who is taking the responsibility - TaskState taskState = taskDriver.pollForWorkflowState(workflow.getName(), - TaskState.COMPLETED, TaskState.FAILED, TaskState.STOPPED, TaskState.ABORTED); - System.out.println("Workflow finished with state " + taskState.name()); + if (monitor) { + TaskState taskState = taskDriver.pollForWorkflowState(workflow.getName(), + TaskState.COMPLETED, TaskState.FAILED, TaskState.STOPPED, TaskState.ABORTED); + logger.info("Workflow " + workflowName + " for process " + processId + " finished with state " + taskState.name()); + + } + return workflowName; } } \ No newline at end of file diff --git a/modules/helix-spectator/pom.xml b/modules/helix-spectator/pom.xml index 213f747..326d7ef 100644 --- a/modules/helix-spectator/pom.xml +++ b/modules/helix-spectator/pom.xml @@ -60,5 +60,10 @@ <artifactId>job-monitor</artifactId> <version>0.17-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> + <artifactId>airavata-messaging-core</artifactId> + <version>0.17-SNAPSHOT</version> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java index a60a955..31b6f30 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/task/DefaultJobSubmissionTask.java @@ -200,7 +200,7 @@ public class DefaultJobSubmissionTask extends JobSubmissionTask { //taskStatus.setReason("Couldn't find job id in both submitted and verified steps"); //taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); return onFail("Couldn't find job id in both submitted and verified steps", false, null); - }else { + } else { //GFacUtils.saveJobModel(processContext, jobModel); } diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java index 25f8ec5..383fe37 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java @@ -103,6 +103,9 @@ public class PostWorkflowManager { String processId = getProcessIdByJobId(jobStatusResult.getJobId()); String status = getStatusByJobId(jobStatusResult.getJobId()); + logger.info("Starting the post workflow for job id : " + jobStatusResult.getJobId() + " with process id " + + processId + ", gateway " + gateway + " and status " + status); + // TODO get cluster lock before that if ("cancelled".equals(status)) { @@ -151,8 +154,8 @@ public class PostWorkflowManager { WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster", "wm-23", ServerSettings.getZookeeperConnection()); - workflowManager.launchWorkflow(UUID.randomUUID().toString(), - allTasks.stream().map(t -> (AiravataTask) t).collect(Collectors.toList()), true); + workflowManager.launchWorkflow(processId + "-POST-" + UUID.randomUUID().toString(), + allTasks.stream().map(t -> (AiravataTask) t).collect(Collectors.toList()), true, false); } else if (jobStatusResult.getState() == JobState.CANCELED) { logger.info("Job " + jobStatusResult.getJobId() + " was externally cancelled"); @@ -176,25 +179,14 @@ public class PostWorkflowManager { private void runConsumer() throws InterruptedException { final Consumer<String, JobStatusResult> consumer = createConsumer(); - final int giveUp = 100; int noRecordsCount = 0; - while (true) { final ConsumerRecords<String, JobStatusResult> consumerRecords = consumer.poll(1000); - - /*if (consumerRecords.count() == 0) { - noRecordsCount++; - if (noRecordsCount > giveUp) break; - else continue; - }*/ - consumerRecords.forEach(record -> { process(record.value()); }); consumer.commitAsync(); } - //consumer.close(); - //System.out.println("DONE"); } public static void main(String[] args) throws Exception { diff --git a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java index 9814b01..3030375 100644 --- a/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java +++ b/modules/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java @@ -1,21 +1,29 @@ package org.apache.airavata.helix.impl.workflow; -import org.apache.airavata.helix.core.AbstractTask; +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.helix.core.OutPort; import org.apache.airavata.helix.impl.task.AiravataTask; import org.apache.airavata.helix.impl.task.EnvSetupTask; import org.apache.airavata.helix.impl.task.InputDataStagingTask; -import org.apache.airavata.helix.impl.task.OutputDataStagingTask; import org.apache.airavata.helix.impl.task.submission.task.DefaultJobSubmissionTask; import org.apache.airavata.helix.workflow.WorkflowManager; +import org.apache.airavata.messaging.core.*; import org.apache.airavata.model.experiment.ExperimentModel; +import org.apache.airavata.model.messaging.event.MessageType; +import org.apache.airavata.model.messaging.event.ProcessSubmitEvent; import org.apache.airavata.model.process.ProcessModel; import org.apache.airavata.model.task.TaskModel; import org.apache.airavata.model.task.TaskTypes; import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; -import org.apache.airavata.registry.cpi.AppCatalog; import org.apache.airavata.registry.cpi.ExperimentCatalog; import org.apache.airavata.registry.cpi.ExperimentCatalogModelType; +import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; import java.util.ArrayList; import java.util.List; @@ -25,11 +33,28 @@ import java.util.stream.Collectors; public class PreWorkflowManager { + private static final Logger logger = LogManager.getLogger(PreWorkflowManager.class); + + private final Subscriber subscriber; + + public PreWorkflowManager() throws AiravataException { + List<String> routingKeys = new ArrayList<>(); + routingKeys.add(ServerSettings.getRabbitmqProcessExchangeName()); + this.subscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(), routingKeys, Type.PROCESS_LAUNCH); + } + public static void main(String[] args) throws Exception { - String processId = "PROCESS_5b252ad9-d630-4cf9-80e3-0c30c55d1001"; - AppCatalog appCatalog = RegistryFactory.getAppCatalog(); - ExperimentCatalog experimentCatalog = RegistryFactory.getDefaultExpCatalog(); + PreWorkflowManager preWorkflowManager = new PreWorkflowManager(); + + //String processId = "PROCESS_5b252ad9-d630-4cf9-80e3-0c30c55d1001"; + //AppCatalog appCatalog = RegistryFactory.getAppCatalog(); + + } + + private String createAndLaunchPreWorkflow(String processId, String gateway) throws Exception { + + ExperimentCatalog experimentCatalog = RegistryFactory.getExperimentCatalog(gateway); ProcessModel processModel = (ProcessModel) experimentCatalog.get(ExperimentCatalogModelType.PROCESS, processId); ExperimentModel experimentModel = (ExperimentModel) experimentCatalog.get(ExperimentCatalogModelType.EXPERIMENT, processModel.getExperimentId()); @@ -74,16 +99,49 @@ public class PreWorkflowManager { } } -/* DefaultJobSubmissionTask defaultJobSubmissionTask = new DefaultJobSubmissionTask(); - defaultJobSubmissionTask.setGatewayId("default"); - defaultJobSubmissionTask.setExperimentId("Clone_of_Mothur-Test1_0c9f627e-2c32-403e-a28a-2a8b10c21c1a"); - defaultJobSubmissionTask.setProcessId("PROCESS_438a87cc-2dec-4edc-bfeb-31128df91bb6"); - defaultJobSubmissionTask.setTaskId("TASK_612844a4-aedb-41a5-824f-9b20c76867f7"); - - List<AbstractTask> tasks = new ArrayList<>(); - tasks.add(defaultJobSubmissionTask); -*/ - WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster", "wm-22", "localhost:2199"); - workflowManager.launchWorkflow(UUID.randomUUID().toString(), allTasks.stream().map(t -> (AiravataTask)t).collect(Collectors.toList()), true); + WorkflowManager workflowManager = new WorkflowManager("AiravataDemoCluster", "wm-22", + ServerSettings.getZookeeperConnection()); + String workflowName = workflowManager.launchWorkflow(processId + "-PRE-" + UUID.randomUUID().toString(), + allTasks.stream().map(t -> (AiravataTask) t).collect(Collectors.toList()), true, false); + return workflowName; + } + + private class ProcessLaunchMessageHandler implements MessageHandler { + + @Override + public void onMessage(MessageContext messageContext) { + logger.info(" Message Received with message id " + messageContext.getMessageId() + " and with message type: " + messageContext.getType()); + + if (messageContext.getType().equals(MessageType.LAUNCHPROCESS)) { + ProcessSubmitEvent event = new ProcessSubmitEvent(); + TBase messageEvent = messageContext.getEvent(); + + try { + byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); + ThriftUtils.createThriftFromBytes(bytes, event); + } catch (TException e) { + logger.error("Failed to fetch process submit event", e); + subscriber.sendAck(messageContext.getDeliveryTag()); + } + + String processId = event.getProcessId(); + String gateway = event.getGatewayId(); + + logger.info("Received process launch message for process " + processId + " in gateway " + gateway); + + try { + logger.info("Launching the pre workflow for process " + processId + " in gateway " + gateway ); + String workflowName = createAndLaunchPreWorkflow(processId, gateway); + logger.info("Completed launching the pre workflow " + workflowName + " for process " + processId + " in gateway " + gateway ); + subscriber.sendAck(messageContext.getDeliveryTag()); + } catch (Exception e) { + logger.error("Failed to launch the pre workflow for process " + processId + " in gateway " + gateway, e); + subscriber.sendAck(messageContext.getDeliveryTag()); + } + } else { + logger.warn("Unknown message type"); + subscriber.sendAck(messageContext.getDeliveryTag()); + } + } } } -- To stop receiving notification emails like this one, please contact dimuthu...@apache.org.