Repository: beam
Updated Branches:
  refs/heads/gearpump-runner 4c445dd0b -> 1ed16f11a


[BEAM-1180] Implement GearpumpPipelineResult


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/21554764
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/21554764
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/21554764

Branch: refs/heads/gearpump-runner
Commit: 21554764056c45ea18be1e844b4ca1bfb71e544a
Parents: 4c445dd
Author: manuzhang <owenzhang1...@gmail.com>
Authored: Tue Dec 20 10:39:56 2016 +0800
Committer: manuzhang <owenzhang1...@gmail.com>
Committed: Wed Jan 4 12:59:08 2017 +0800

----------------------------------------------------------------------
 runners/gearpump/pom.xml                        |  1 +
 .../gearpump/GearpumpPipelineResult.java        | 59 ++++++++++++++++++--
 .../beam/runners/gearpump/GearpumpRunner.java   |  4 +-
 .../runners/gearpump/TestGearpumpRunner.java    |  4 ++
 .../translators/GroupByKeyTranslator.java       |  1 -
 5 files changed, 62 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/21554764/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index bb35ad7..777ad34 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -69,6 +69,7 @@
                   <dependenciesToScan>
                     
<dependency>org.apache.beam:beam-sdks-java-core</dependency>
                   </dependenciesToScan>
+                  <argLine>-noverify</argLine>
                   <excludes>
                     <!-- side input is not supported in Gearpump -->
                     <exclude>

http://git-wip-us.apache.org/repos/asf/beam/blob/21554764/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index ed1201d..9c8f7b3 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.gearpump;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
@@ -26,31 +27,62 @@ import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.transforms.Aggregator;
 
+import org.apache.gearpump.cluster.MasterToAppMaster;
+import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData;
+import org.apache.gearpump.cluster.client.ClientContext;
 import org.joda.time.Duration;
 
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
 
 /**
  * Result of executing a {@link Pipeline} with Gearpump.
  */
 public class GearpumpPipelineResult implements PipelineResult {
+
+  private final ClientContext client;
+  private final int appId;
+  private final Duration defaultWaitDuration = Duration.standardSeconds(60);
+  private final Duration defaultWaitInterval = Duration.standardSeconds(10);
+
+  public GearpumpPipelineResult(ClientContext client, int appId) {
+    this.client = client;
+    this.appId = appId;
+  }
+
   @Override
   public State getState() {
-    return null;
+    return getGearpumpState();
   }
 
   @Override
   public State cancel() throws IOException {
-    return null;
+    client.shutdown(appId);
+    return State.CANCELLED;
   }
 
   @Override
   public State waitUntilFinish(Duration duration) {
-    return null;
+    long start = System.currentTimeMillis();
+    do {
+      try {
+        Thread.sleep(defaultWaitInterval.getMillis());
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    } while (State.RUNNING == getGearpumpState()
+        && (System.currentTimeMillis() - start) < duration.getMillis());
+
+    if (State.RUNNING == getGearpumpState()) {
+      return State.DONE;
+    } else {
+      return State.FAILED;
+    }
   }
 
   @Override
   public State waitUntilFinish() {
-    return null;
+    return waitUntilFinish(defaultWaitDuration);
   }
 
   @Override
@@ -66,4 +98,23 @@ public class GearpumpPipelineResult implements 
PipelineResult {
     return null;
   }
 
+  private State getGearpumpState() {
+    String status = null;
+    List<AppMasterData> apps =
+        JavaConverters.<AppMasterData>seqAsJavaListConverter(
+            (Seq<AppMasterData>) client.listApps().appMasters()).asJava();
+    for (AppMasterData app: apps) {
+      if (app.appId() == appId) {
+        status = app.status();
+      }
+    }
+    if (null == status || 
status.equals(MasterToAppMaster.AppMasterNonExist())) {
+      return State.UNKNOWN;
+    } else if (status.equals(MasterToAppMaster.AppMasterActive())) {
+      return State.RUNNING;
+    } else {
+      return State.STOPPED;
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/21554764/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index 4083922..9c44da3 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -107,9 +107,9 @@ public class GearpumpRunner extends 
PipelineRunner<GearpumpPipelineResult> {
     TranslationContext translationContext = new TranslationContext(streamApp, 
options);
     GearpumpPipelineTranslator translator = new 
GearpumpPipelineTranslator(translationContext);
     translator.translate(pipeline);
-    streamApp.submit();
+    int appId = streamApp.submit();
 
-    return null;
+    return new GearpumpPipelineResult(clientContext, appId);
   }
 
   private ClientContext getClientContext(GearpumpPipelineOptions options, 
Config config) {

http://git-wip-us.apache.org/repos/asf/beam/blob/21554764/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
index 89d31a6..ee31fb5 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.gearpump;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
@@ -52,7 +53,10 @@ public class TestGearpumpRunner extends 
PipelineRunner<GearpumpPipelineResult> {
   @Override
   public GearpumpPipelineResult run(Pipeline pipeline) {
     GearpumpPipelineResult result = delegate.run(pipeline);
+    PipelineResult.State state = result.waitUntilFinish();
     cluster.stop();
+    assert(state == PipelineResult.State.DONE);
+
     return result;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/21554764/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index d64f1bf..989957f 100644
--- 
a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ 
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -134,7 +134,6 @@ public class GroupByKeyTranslator<K, V> implements 
TransformTranslator<GroupByKe
   private static class ValueToIterable<K, V>
       implements MapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<K, 
Iterable<V>>>> {
 
-
     @Override
     public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, V>> wv) 
{
       Iterable<V> values = Lists.newArrayList(wv.getValue().getValue());

Reply via email to