[FLINK-1201] [gelly] Expose the full Vertex and Edge object in filter functions
Expose the full Vertex and Edge object in filter functions to allow filtering by key value: - subgraph() - filterOnVertices() - filterOnEdges() fixes #56 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40457c29 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40457c29 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40457c29 Branch: refs/heads/master Commit: 40457c29a20ab7f93542900d17e034774db20a12 Parents: d883c3a Author: Carsten Brandt <m...@cebe.cc> Authored: Mon Jan 12 20:59:13 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 10:46:14 2015 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/flink/graph/Graph.java | 46 ++---------- .../flink/graph/test/TestGraphOperations.java | 77 +++++++++++++------- 2 files changed, 56 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/40457c29/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 51b8c30..1990f26 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -346,10 +346,9 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab * @param edgeFilter * @return */ - public Graph<K, VV, EV> subgraph(FilterFunction<VV> vertexFilter, FilterFunction<EV> edgeFilter) { + public Graph<K, VV, EV> subgraph(FilterFunction<Vertex<K, VV>> vertexFilter, FilterFunction<Edge<K, EV>> edgeFilter) { - DataSet<Vertex<K, VV>> filteredVertices = this.vertices.filter( - new ApplyVertexFilter<K, VV>(vertexFilter)); + DataSet<Vertex<K, VV>> filteredVertices = this.vertices.filter(vertexFilter); DataSet<Edge<K, EV>> remainingEdges = this.edges.join(filteredVertices) .where(0).equalTo(0) @@ -357,8 +356,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab .join(filteredVertices).where(1).equalTo(0) .with(new ProjectEdge<K, VV, EV>()); - DataSet<Edge<K, EV>> filteredEdges = remainingEdges.filter( - new ApplyEdgeFilter<K, EV>(edgeFilter)); + DataSet<Edge<K, EV>> filteredEdges = remainingEdges.filter(edgeFilter); return new Graph<K, VV, EV>(filteredVertices, filteredEdges, this.context); } @@ -370,10 +368,9 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab * @param vertexFilter * @return */ - public Graph<K, VV, EV> filterOnVertices(FilterFunction<VV> vertexFilter) { + public Graph<K, VV, EV> filterOnVertices(FilterFunction<Vertex<K, VV>> vertexFilter) { - DataSet<Vertex<K, VV>> filteredVertices = this.vertices.filter( - new ApplyVertexFilter<K, VV>(vertexFilter)); + DataSet<Vertex<K, VV>> filteredVertices = this.vertices.filter(vertexFilter); DataSet<Edge<K, EV>> remainingEdges = this.edges.join(filteredVertices) .where(0).equalTo(0) @@ -391,9 +388,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab * @param edgeFilter * @return */ - public Graph<K, VV, EV> filterOnEdges(FilterFunction<EV> edgeFilter) { - DataSet<Edge<K, EV>> filteredEdges = this.edges.filter( - new ApplyEdgeFilter<K, EV>(edgeFilter)); + public Graph<K, VV, EV> filterOnEdges(FilterFunction<Edge<K, EV>> edgeFilter) { + DataSet<Edge<K, EV>> filteredEdges = this.edges.filter(edgeFilter); return new Graph<K, VV, EV>(this.vertices, filteredEdges, this.context); } @@ -408,34 +404,6 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } } - private static final class ApplyVertexFilter<K extends Comparable<K> & Serializable, - VV extends Serializable> implements FilterFunction<Vertex<K, VV>> { - - private FilterFunction<VV> innerFilter; - - public ApplyVertexFilter(FilterFunction<VV> theFilter) { - this.innerFilter = theFilter; - } - - public boolean filter(Vertex<K, VV> value) throws Exception { - return innerFilter.filter(value.f1); - } - - } - - private static final class ApplyEdgeFilter<K extends Comparable<K> & Serializable, - EV extends Serializable> implements FilterFunction<Edge<K, EV>> { - - private FilterFunction<EV> innerFilter; - - public ApplyEdgeFilter(FilterFunction<EV> theFilter) { - this.innerFilter = theFilter; - } - public boolean filter(Edge<K, EV> value) throws Exception { - return innerFilter.filter(value.f2); - } - } - /** * Return the out-degree of all vertices in the graph * @return A DataSet of Tuple2<vertexId, outDegree> http://git-wip-us.apache.org/repos/asf/flink/blob/40457c29/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java index 7dec548..cb285c0 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java @@ -18,7 +18,7 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) public class TestGraphOperations extends JavaProgramTestBase { - private static int NUM_PROGRAMS = 9; + private static int NUM_PROGRAMS = 10; private int curProgId = config.getInteger("ProgramId", -1); private String resultPath; @@ -109,14 +109,14 @@ public class TestGraphOperations extends JavaProgramTestBase { Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env); - graph.subgraph(new FilterFunction<Long>() { - public boolean filter(Long value) throws Exception { - return (value > 2); + graph.subgraph(new FilterFunction<Vertex<Long, Long>>() { + public boolean filter(Vertex<Long, Long> vertex) throws Exception { + return (vertex.getValue() > 2); } }, - new FilterFunction<Long>() { - public boolean filter(Long value) throws Exception { - return (value > 34); + new FilterFunction<Edge<Long, Long>>() { + public boolean filter(Edge<Long, Long> edge) throws Exception { + return (edge.getValue() > 34); } }).getEdges().writeAsCsv(resultPath); @@ -126,6 +126,44 @@ public class TestGraphOperations extends JavaProgramTestBase { } case 4: { /* + * Test filterOnVertices: + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph.filterOnVertices(new FilterFunction<Vertex<Long, Long>>() { + public boolean filter(Vertex<Long, Long> vertex) throws Exception { + return (vertex.getValue() > 2); + } + }).getEdges().writeAsCsv(resultPath); + + env.execute(); + return "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n"; + } + case 5: { + /* + * Test filterOnEdges: + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph.filterOnEdges(new FilterFunction<Edge<Long, Long>>() { + public boolean filter(Edge<Long, Long> edge) throws Exception { + return (edge.getValue() > 34); + } + }).getEdges().writeAsCsv(resultPath); + + env.execute(); + return "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + case 6: { + /* * Test numberOfVertices() */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -137,7 +175,7 @@ public class TestGraphOperations extends JavaProgramTestBase { env.execute(); return "5"; } - case 5: { + case 7: { /* * Test numberOfEdges() */ @@ -150,7 +188,7 @@ public class TestGraphOperations extends JavaProgramTestBase { env.execute(); return "7"; } - case 6: { + case 8: { /* * Test getVertexIds() */ @@ -163,7 +201,7 @@ public class TestGraphOperations extends JavaProgramTestBase { env.execute(); return "1\n2\n3\n4\n5\n"; } - case 7: { + case 9: { /* * Test getEdgeIds() */ @@ -179,7 +217,7 @@ public class TestGraphOperations extends JavaProgramTestBase { "3,5\n" + "4,5\n" + "5,1\n"; } - case 8: { + case 10: { /* * Test union() */ @@ -210,23 +248,6 @@ public class TestGraphOperations extends JavaProgramTestBase { "5,1,51\n" + "6,1,61\n"; } - case 9: { - /* - * Test getDegrees() with disconnected data - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, NullValue, Long> graph = - Graph.create(TestGraphUtils.getDisconnectedLongLongEdgeData(env), env); - - graph.outDegrees().writeAsCsv(resultPath); - env.execute(); - return "1,2\n" + - "2,1\n" + - "3,0\n" + - "4,1\n" + - "5,0\n"; - } default: throw new IllegalArgumentException("Invalid program id"); }