[FLINK-1201] [gelly] correct graph execution environment in 
InvalidVertexIdsValidator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99544bd9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99544bd9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99544bd9

Branch: refs/heads/master
Commit: 99544bd9000eebe468a721b0bcf06d3e387fdacf
Parents: d9b46c6
Author: vasia <vasilikikala...@gmail.com>
Authored: Mon Dec 22 17:32:57 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 11 10:46:13 2015 +0100

----------------------------------------------------------------------
 .../flink-gelly/src/main/java/org/apache/flink/graph/Graph.java  | 4 ++++
 .../apache/flink/graph/validation/InvalidVertexIdsValidator.java | 3 +--
 2 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/99544bd9/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index bc91b99..85717d8 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -92,6 +92,10 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
                Graph.edgeValueType = ((TupleTypeInfo<?>) 
edges.getType()).getTypeAt(2);
        }
 
+       public ExecutionEnvironment getContext() {
+               return this.context;
+       }
+
        /**
         * Function that checks whether a graph's ids are valid
         * @return

http://git-wip-us.apache.org/repos/asf/flink/blob/99544bd9/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
index 1b68abe..6b7a619 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
@@ -4,7 +4,6 @@ import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.util.Collector;
 
@@ -31,7 +30,7 @@ public class InvalidVertexIdsValidator<K extends 
Comparable<K> & Serializable, V
         DataSet<K> invalidIds = 
graph.getVertices().coGroup(edgeIds).where(0).equalTo(0)
                 .with(new GroupInvalidIds<K, VV>()).first(1);
 
-        return GraphUtils.count(invalidIds.map(new KToTupleMap<K>()), 
ExecutionEnvironment.getExecutionEnvironment())
+        return GraphUtils.count(invalidIds.map(new KToTupleMap<K>()), 
graph.getContext())
                 .map(new InvalidIdsMap());
     }
 

Reply via email to