[FLINK-1201] [gelly] correct graph execution environment in InvalidVertexIdsValidator
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99544bd9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99544bd9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99544bd9 Branch: refs/heads/master Commit: 99544bd9000eebe468a721b0bcf06d3e387fdacf Parents: d9b46c6 Author: vasia <vasilikikala...@gmail.com> Authored: Mon Dec 22 17:32:57 2014 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 10:46:13 2015 +0100 ---------------------------------------------------------------------- .../flink-gelly/src/main/java/org/apache/flink/graph/Graph.java | 4 ++++ .../apache/flink/graph/validation/InvalidVertexIdsValidator.java | 3 +-- 2 files changed, 5 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/99544bd9/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index bc91b99..85717d8 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -92,6 +92,10 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab Graph.edgeValueType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(2); } + public ExecutionEnvironment getContext() { + return this.context; + } + /** * Function that checks whether a graph's ids are valid * @return http://git-wip-us.apache.org/repos/asf/flink/blob/99544bd9/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java index 1b68abe..6b7a619 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java @@ -4,7 +4,6 @@ import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.util.Collector; @@ -31,7 +30,7 @@ public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, V DataSet<K> invalidIds = graph.getVertices().coGroup(edgeIds).where(0).equalTo(0) .with(new GroupInvalidIds<K, VV>()).first(1); - return GraphUtils.count(invalidIds.map(new KToTupleMap<K>()), ExecutionEnvironment.getExecutionEnvironment()) + return GraphUtils.count(invalidIds.map(new KToTupleMap<K>()), graph.getContext()) .map(new InvalidIdsMap()); }