[FLINK-1201] [gelly] Expose the full Vertex and Edge object in filter functions

Expose the full Vertex and Edge object in filter functions to allow filtering
by key value:

- subgraph()
- filterOnVertices()
- filterOnEdges()

fixes #56


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40457c29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40457c29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40457c29

Branch: refs/heads/master
Commit: 40457c29a20ab7f93542900d17e034774db20a12
Parents: d883c3a
Author: Carsten Brandt <m...@cebe.cc>
Authored: Mon Jan 12 20:59:13 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 11 10:46:14 2015 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java | 46 ++----------
 .../flink/graph/test/TestGraphOperations.java   | 77 +++++++++++++-------
 2 files changed, 56 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/40457c29/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 51b8c30..1990f26 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -346,10 +346,9 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
      * @param edgeFilter
      * @return
      */
-    public Graph<K, VV, EV> subgraph(FilterFunction<VV> vertexFilter, 
FilterFunction<EV> edgeFilter) {
+    public Graph<K, VV, EV> subgraph(FilterFunction<Vertex<K, VV>> 
vertexFilter, FilterFunction<Edge<K, EV>> edgeFilter) {
 
-        DataSet<Vertex<K, VV>> filteredVertices = this.vertices.filter(
-                       new ApplyVertexFilter<K, VV>(vertexFilter));
+        DataSet<Vertex<K, VV>> filteredVertices = 
this.vertices.filter(vertexFilter);
 
         DataSet<Edge<K, EV>> remainingEdges = this.edges.join(filteredVertices)
                        .where(0).equalTo(0)
@@ -357,8 +356,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
                        .join(filteredVertices).where(1).equalTo(0)
                        .with(new ProjectEdge<K, VV, EV>());
 
-        DataSet<Edge<K, EV>> filteredEdges = remainingEdges.filter(
-                       new ApplyEdgeFilter<K, EV>(edgeFilter));
+        DataSet<Edge<K, EV>> filteredEdges = remainingEdges.filter(edgeFilter);
 
         return new Graph<K, VV, EV>(filteredVertices, filteredEdges, 
this.context);
     }
@@ -370,10 +368,9 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
         * @param vertexFilter
         * @return
         */
-       public Graph<K, VV, EV> filterOnVertices(FilterFunction<VV> 
vertexFilter) {
+       public Graph<K, VV, EV> filterOnVertices(FilterFunction<Vertex<K, VV>> 
vertexFilter) {
 
-               DataSet<Vertex<K, VV>> filteredVertices = this.vertices.filter(
-                               new ApplyVertexFilter<K, VV>(vertexFilter));
+               DataSet<Vertex<K, VV>> filteredVertices = 
this.vertices.filter(vertexFilter);
 
                DataSet<Edge<K, EV>> remainingEdges = 
this.edges.join(filteredVertices)
                                .where(0).equalTo(0)
@@ -391,9 +388,8 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
         * @param edgeFilter
         * @return
         */
-       public Graph<K, VV, EV> filterOnEdges(FilterFunction<EV> edgeFilter) {
-               DataSet<Edge<K, EV>> filteredEdges = this.edges.filter(
-                               new ApplyEdgeFilter<K, EV>(edgeFilter));
+       public Graph<K, VV, EV> filterOnEdges(FilterFunction<Edge<K, EV>> 
edgeFilter) {
+               DataSet<Edge<K, EV>> filteredEdges = 
this.edges.filter(edgeFilter);
 
                return new Graph<K, VV, EV>(this.vertices, filteredEdges, 
this.context);
        }
@@ -408,34 +404,6 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
                }
     }
     
-    private static final class ApplyVertexFilter<K extends Comparable<K> & 
Serializable, 
-       VV extends Serializable> implements FilterFunction<Vertex<K, VV>> {
-
-       private FilterFunction<VV> innerFilter;
-       
-       public ApplyVertexFilter(FilterFunction<VV> theFilter) {
-               this.innerFilter = theFilter;
-       }
-
-               public boolean filter(Vertex<K, VV> value) throws Exception {
-                       return innerFilter.filter(value.f1);
-               }
-       
-    }
-
-    private static final class ApplyEdgeFilter<K extends Comparable<K> & 
Serializable, 
-               EV extends Serializable> implements FilterFunction<Edge<K, EV>> 
{
-
-       private FilterFunction<EV> innerFilter;
-       
-       public ApplyEdgeFilter(FilterFunction<EV> theFilter) {
-               this.innerFilter = theFilter;
-       }       
-        public boolean filter(Edge<K, EV> value) throws Exception {
-            return innerFilter.filter(value.f2);
-        }
-    }
-
     /**
      * Return the out-degree of all vertices in the graph
      * @return A DataSet of Tuple2<vertexId, outDegree>

http://git-wip-us.apache.org/repos/asf/flink/blob/40457c29/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java
index 7dec548..cb285c0 100644
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java
@@ -18,7 +18,7 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class TestGraphOperations extends JavaProgramTestBase {
 
-       private static int NUM_PROGRAMS = 9;
+       private static int NUM_PROGRAMS = 10;
 
        private int curProgId = config.getInteger("ProgramId", -1);
        private String resultPath;
@@ -109,14 +109,14 @@ public class TestGraphOperations extends 
JavaProgramTestBase {
 
                                        Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env),
                                                        
TestGraphUtils.getLongLongEdgeData(env), env);
-                                       graph.subgraph(new 
FilterFunction<Long>() {
-                                                                          
public boolean filter(Long value) throws Exception {
-                                                                               
   return (value > 2);
+                                       graph.subgraph(new 
FilterFunction<Vertex<Long, Long>>() {
+                                                                          
public boolean filter(Vertex<Long, Long> vertex) throws Exception {
+                                                                               
   return (vertex.getValue() > 2);
                                                                           }
                                                                   },
-                                                       new 
FilterFunction<Long>() {
-                                                               public boolean 
filter(Long value) throws Exception {
-                                                                       return 
(value > 34);
+                                                       new 
FilterFunction<Edge<Long, Long>>() {
+                                                               public boolean 
filter(Edge<Long, Long> edge) throws Exception {
+                                                                       return 
(edge.getValue() > 34);
                                                                }
                                                        
}).getEdges().writeAsCsv(resultPath);
 
@@ -126,6 +126,44 @@ public class TestGraphOperations extends 
JavaProgramTestBase {
                                }
                                case 4: {
                                /*
+                                * Test filterOnVertices:
+                                */
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                                       Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                                                       
TestGraphUtils.getLongLongEdgeData(env), env);
+                                       graph.filterOnVertices(new 
FilterFunction<Vertex<Long, Long>>() {
+                                               public boolean 
filter(Vertex<Long, Long> vertex) throws Exception {
+                                                       return 
(vertex.getValue() > 2);
+                                               }
+                                       }).getEdges().writeAsCsv(resultPath);
+
+                                       env.execute();
+                                       return  "3,4,34\n" +
+                                                       "3,5,35\n" +
+                                                       "4,5,45\n";
+                               }
+                               case 5: {
+                               /*
+                                * Test filterOnEdges:
+                                */
+                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                                       Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                                                       
TestGraphUtils.getLongLongEdgeData(env), env);
+                                       graph.filterOnEdges(new 
FilterFunction<Edge<Long, Long>>() {
+                                               public boolean 
filter(Edge<Long, Long> edge) throws Exception {
+                                                       return (edge.getValue() 
> 34);
+                                               }
+                                       }).getEdges().writeAsCsv(resultPath);
+
+                                       env.execute();
+                                       return "3,5,35\n" +
+                                                       "4,5,45\n" +
+                                                       "5,1,51\n";
+                               }
+                               case 6: {
+                               /*
                                 * Test numberOfVertices()
                                 */
                                        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
@@ -137,7 +175,7 @@ public class TestGraphOperations extends 
JavaProgramTestBase {
                                        env.execute();
                                        return "5";
                                }
-                               case 5: {
+                               case 7: {
                                /*
                                 * Test numberOfEdges()
                                 */
@@ -150,7 +188,7 @@ public class TestGraphOperations extends 
JavaProgramTestBase {
                                        env.execute();
                                        return "7";
                                }
-                               case 6: {
+                               case 8: {
                                /*
                                 * Test getVertexIds()
                                 */
@@ -163,7 +201,7 @@ public class TestGraphOperations extends 
JavaProgramTestBase {
                                        env.execute();
                                        return "1\n2\n3\n4\n5\n";
                                }
-                               case 7: {
+                               case 9: {
                                /*
                                 * Test getEdgeIds()
                                 */
@@ -179,7 +217,7 @@ public class TestGraphOperations extends 
JavaProgramTestBase {
                                                        "3,5\n" + "4,5\n" +
                                                        "5,1\n";
                                }
-                               case 8: {
+                               case 10: {
                                /*
                                 * Test union()
                                 */
@@ -210,23 +248,6 @@ public class TestGraphOperations extends 
JavaProgramTestBase {
                                                        "5,1,51\n" +
                                                        "6,1,61\n";
                                }
-                               case 9: {
-                               /*
-                                * Test getDegrees() with disconnected data
-                                */
-                                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-                                       Graph<Long, NullValue, Long> graph = 
-                                                       
Graph.create(TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
-
-                                       
graph.outDegrees().writeAsCsv(resultPath);
-                                       env.execute();
-                                       return "1,2\n" +
-                                                       "2,1\n" +
-                                                       "3,0\n" +
-                                                       "4,1\n" +
-                                                       "5,0\n";
-                                       }
                                default:
                                        throw new 
IllegalArgumentException("Invalid program id");
                        }

Reply via email to