iemejia commented on a change in pull request #12731:
URL: https://github.com/apache/beam/pull/12731#discussion_r487696110



##########
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineResult.java
##########
@@ -17,15 +17,21 @@
  */
 package org.apache.beam.runners.twister2;
 
+import edu.iu.dsc.tws.api.scheduler.Twister2JobState;
 import java.io.IOException;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.joda.time.Duration;
+import org.mortbay.log.Log;

Review comment:
       Is there any particular reason to use a different logger kind here?
   We use slf4j in most of the Beam code base for logging so maybe good to 
follow this on the Twister2 runner. Notice that this can be done in a separate 
PR.

##########
File path: 
sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
##########
@@ -281,6 +281,17 @@
         </dependency>
       </dependencies>
     </profile>
+    <profile>
+      <id>twister2-runner</id>

Review comment:
       :+1: 

##########
File path: 
runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2Runner.java
##########
@@ -119,19 +129,27 @@ public PipelineResult run(Pipeline pipeline) {
             .addComputeResource(options.getWorkerCPUs(), 
options.getRamMegaBytes(), workers)
             .setConfig(jobConfig)
             .build();
-    Twister2JobState jobState = Twister2Submitter.submitJob(twister2Job, 
config);
 
-    Twister2PipelineResult result = new Twister2PipelineResult();
-    // TODO: Need to fix the check for "RUNNING" once fix for this is done on 
Twister2 end.
-    if (jobState.getJobstate() == DriverJobState.FAILED
-        || jobState.getJobstate() == DriverJobState.RUNNING) {
-      throw new RuntimeException("Pipeline execution failed", 
jobState.getCause());
+    Twister2JobState jobState;
+    if (isLocalMode(options)) {
+      jobState = LocalSubmitter.submitJob(twister2Job, config);
     } else {
-      result.setState(PipelineResult.State.DONE);
+      jobState = Twister2Submitter.submitJob(twister2Job, config);
     }
+
+    Twister2PipelineResult result = new Twister2PipelineResult(jobState);
     return result;
   }
 
+  /** Check if the Runner is set to use Twister local mode or pointing to a 
deployment. */
+  private boolean isLocalMode(Twister2PipelineOptions options) {
+    if (options.getTwister2Home() == null || 
"".equals(options.getTwister2Home())) {

Review comment:
       with guava (use vendor if so) this can be simpler, but that's not 
blocking at all for this PR
   `return Strings.isNullOrEmpty(options.getTwister2Home());`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to