iemejia commented on a change in pull request #12731: URL: https://github.com/apache/beam/pull/12731#discussion_r487696110
########## File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineResult.java ########## @@ -17,15 +17,21 @@ */ package org.apache.beam.runners.twister2; +import edu.iu.dsc.tws.api.scheduler.Twister2JobState; import java.io.IOException; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; import org.joda.time.Duration; +import org.mortbay.log.Log; Review comment: Is there any particular reason to use a different logger kind here? We use slf4j in most of the Beam code base for logging so maybe good to follow this on the Twister2 runner. Notice that this can be done in a separate PR. ########## File path: sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml ########## @@ -281,6 +281,17 @@ </dependency> </dependencies> </profile> + <profile> + <id>twister2-runner</id> Review comment: :+1: ########## File path: runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java ########## @@ -119,19 +129,27 @@ public PipelineResult run(Pipeline pipeline) { .addComputeResource(options.getWorkerCPUs(), options.getRamMegaBytes(), workers) .setConfig(jobConfig) .build(); - Twister2JobState jobState = Twister2Submitter.submitJob(twister2Job, config); - Twister2PipelineResult result = new Twister2PipelineResult(); - // TODO: Need to fix the check for "RUNNING" once fix for this is done on Twister2 end. - if (jobState.getJobstate() == DriverJobState.FAILED - || jobState.getJobstate() == DriverJobState.RUNNING) { - throw new RuntimeException("Pipeline execution failed", jobState.getCause()); + Twister2JobState jobState; + if (isLocalMode(options)) { + jobState = LocalSubmitter.submitJob(twister2Job, config); } else { - result.setState(PipelineResult.State.DONE); + jobState = Twister2Submitter.submitJob(twister2Job, config); } + + Twister2PipelineResult result = new Twister2PipelineResult(jobState); return result; } + /** Check if the Runner is set to use Twister local mode or pointing to a deployment. */ + private boolean isLocalMode(Twister2PipelineOptions options) { + if (options.getTwister2Home() == null || "".equals(options.getTwister2Home())) { Review comment: with guava (use vendor if so) this can be simpler, but that's not blocking at all for this PR `return Strings.isNullOrEmpty(options.getTwister2Home());` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org