[FLINK-1201] [gelly] joinWithEdges implemented and tested

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0c10ec8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0c10ec8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0c10ec8

Branch: refs/heads/master
Commit: e0c10ec8f8aff10eaa186d04dda252e105b4eb0b
Parents: b2c89cc
Author: andralungu <lungu.an...@gmail.com>
Authored: Thu Jan 8 18:16:31 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 11 10:46:14 2015 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java | 123 ++++
 .../apache/flink/graph/test/TestGraphUtils.java |  90 +++
 .../flink/graph/test/TestJoinWithEdges.java     | 584 +++++++++++++++++++
 3 files changed, 797 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0c10ec8/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 1cd5c90..71a701b 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -222,6 +222,129 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        }
 
        /**
+        * Method that joins the edge DataSet with an input DataSet on a 
composite key of both source and target
+        * and applies a UDF on the resulted values.
+        * @param inputDataSet
+        * @param mapper - the UDF applied
+        * @param <T>
+        * @return - a new graph where the edge values have been updated.
+        */
+       public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>> 
inputDataSet,
+                                                                               
          final MapFunction<Tuple2<EV, T>, EV> mapper) {
+
+               DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
+                               .coGroup(inputDataSet).where(0,1).equalTo(0,1)
+                               .with(new ApplyCoGroupToEdgeValues<K, EV, 
T>(mapper));
+
+               return Graph.create(this.getVertices(), resultedEdges, 
this.getContext());
+       }
+
+       private static final class ApplyCoGroupToEdgeValues<K extends 
Comparable<K> & Serializable,
+                       EV extends Serializable, T>
+                       implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, 
T>, Edge<K, EV>> {
+
+               private MapFunction<Tuple2<EV, T>, EV> mapper;
+
+               public ApplyCoGroupToEdgeValues(MapFunction<Tuple2<EV, T>, EV> 
mapper) {
+                       this.mapper = mapper;
+               }
+
+               @Override
+               public void coGroup(Iterable<Edge<K, EV>> iterableDS1,
+                                                       Iterable<Tuple3<K, K, 
T>> iterableDS2,
+                                                       Collector<Edge<K, EV>> 
collector) throws Exception {
+
+                       Iterator<Edge<K, EV>> iteratorDS1 = 
iterableDS1.iterator();
+                       Iterator<Tuple3<K, K, T>> iteratorDS2 = 
iterableDS2.iterator();
+
+                       if(iteratorDS2.hasNext() && iteratorDS1.hasNext()) {
+                               Tuple3<K, K, T> iteratorDS2Next = 
iteratorDS2.next();
+
+                               collector.collect(new Edge<K, 
EV>(iteratorDS2Next.f0, iteratorDS2Next.f1, mapper
+                                               .map(new Tuple2<EV, 
T>(iteratorDS1.next().f2, iteratorDS2Next.f2))));
+
+                       } else if(iteratorDS1.hasNext()) {
+                               collector.collect(iteratorDS1.next());
+                       }
+               }
+       }
+
+       /**
+        * Method that joins the edge DataSet with an input DataSet on the 
source key of the edges and the first attribute
+        * of the input DataSet and applies a UDF on the resulted values.
+        * Should the inputDataSet contain the same key more than once, only 
the first value will be considered.
+        * @param inputDataSet
+        * @param mapper - the UDF applied
+        * @param <T>
+        * @return - a new graph where the edge values have been updated.
+        */
+       public <T> Graph<K, VV, EV> joinWithEdgesOnSource(DataSet<Tuple2<K, T>> 
inputDataSet,
+                                                                               
                 final MapFunction<Tuple2<EV, T>, EV> mapper) {
+
+               DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
+                               .coGroup(inputDataSet).where(0).equalTo(0)
+                               .with(new 
ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper));
+
+               return Graph.create(this.getVertices(), resultedEdges, 
this.getContext());
+       }
+
+       private static final class 
ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K extends Comparable<K> & 
Serializable,
+                       EV extends Serializable, T>
+                       implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, 
Edge<K, EV>> {
+
+               private MapFunction<Tuple2<EV, T>, EV> mapper;
+
+               public 
ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(MapFunction<Tuple2<EV, T>, EV> 
mapper) {
+                       this.mapper = mapper;
+               }
+
+
+               @Override
+               public void coGroup(Iterable<Edge<K, EV>> iterableDS1,
+                                                       Iterable<Tuple2<K, T>> 
iterableDS2,
+                                                       Collector<Edge<K, EV>> 
collector) throws Exception {
+
+                       Iterator<Edge<K, EV>> iteratorDS1 = 
iterableDS1.iterator();
+                       Iterator<Tuple2<K, T>> iteratorDS2 = 
iterableDS2.iterator();
+
+                       if(iteratorDS2.hasNext()) {
+                               Tuple2<K, T> iteratorDS2Next = 
iteratorDS2.next();
+
+                               while(iteratorDS1.hasNext()) {
+                                       Edge<K, EV> iteratorDS1Next = 
iteratorDS1.next();
+
+                                       collector.collect(new Edge<K, 
EV>(iteratorDS1Next.f0, iteratorDS1Next.f1, mapper
+                                                       .map(new Tuple2<EV, 
T>(iteratorDS1Next.f2, iteratorDS2Next.f1))));
+                               }
+
+                       } else {
+                               while(iteratorDS1.hasNext()) {
+                                       collector.collect(iteratorDS1.next());
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Method that joins the edge DataSet with an input DataSet on the 
target key of the edges and the first attribute
+        * of the input DataSet and applies a UDF on the resulted values.
+        * Should the inputDataSet contain the same key more than once, only 
the first value will be considered.
+        * @param inputDataSet
+        * @param mapper - the UDF applied
+        * @param <T>
+        * @return - a new graph where the edge values have been updated.
+        */
+       public <T> Graph<K, VV, EV> joinWithEdgesOnTarget(DataSet<Tuple2<K, T>> 
inputDataSet,
+                                                                               
                          final MapFunction<Tuple2<EV, T>, EV> mapper) {
+
+               DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
+                               .coGroup(inputDataSet).where(1).equalTo(0)
+                               .with(new 
ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper));
+
+               return Graph.create(this.getVertices(), resultedEdges, 
this.getContext());
+       }
+
+       /**
      * Apply value-based filtering functions to the graph 
      * and return a sub-graph that satisfies the predicates
      * for both vertex values and edge values.

http://git-wip-us.apache.org/repos/asf/flink/blob/e0c10ec8/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
index 9816619..d5062c5 100644
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
@@ -7,6 +7,7 @@ import java.util.List;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 
 public class TestGraphUtils {
 
@@ -62,6 +63,48 @@ public class TestGraphUtils {
                return env.fromCollection(tuples);
        }
 
+       public static final DataSet<Tuple2<Long, Long>> 
getLongLongTuple2SourceData(
+                       ExecutionEnvironment env) {
+               List<Tuple2<Long, Long>> tuples = new ArrayList<Tuple2<Long, 
Long>>();
+               tuples.add(new Tuple2<Long, Long>(1L, 10L));
+               tuples.add(new Tuple2<Long, Long>(1L, 20L));
+               tuples.add(new Tuple2<Long, Long>(2L, 30L));
+               tuples.add(new Tuple2<Long, Long>(3L, 40L));
+               tuples.add(new Tuple2<Long, Long>(3L, 50L));
+               tuples.add(new Tuple2<Long, Long>(4L, 60L));
+               tuples.add(new Tuple2<Long, Long>(6L, 70L));
+
+               return env.fromCollection(tuples);
+       }
+
+       public static final DataSet<Tuple2<Long, Long>> 
getLongLongTuple2TargetData(
+                       ExecutionEnvironment env) {
+               List<Tuple2<Long, Long>> tuples = new ArrayList<Tuple2<Long, 
Long>>();
+               tuples.add(new Tuple2<Long, Long>(2L, 10L));
+               tuples.add(new Tuple2<Long, Long>(3L, 20L));
+               tuples.add(new Tuple2<Long, Long>(3L, 30L));
+               tuples.add(new Tuple2<Long, Long>(4L, 40L));
+               tuples.add(new Tuple2<Long, Long>(6L, 50L));
+               tuples.add(new Tuple2<Long, Long>(6L, 60L));
+               tuples.add(new Tuple2<Long, Long>(1L, 70L));
+
+               return env.fromCollection(tuples);
+       }
+
+       public static final DataSet<Tuple3<Long, Long, Long>> 
getLongLongLongTuple3Data(
+                       ExecutionEnvironment env) {
+               List<Tuple3<Long, Long, Long>> tuples = new ArrayList<>();
+               tuples.add(new Tuple3<Long, Long, Long>(1L, 2L, 12L));
+               tuples.add(new Tuple3<Long, Long, Long>(1L, 3L, 13L));
+               tuples.add(new Tuple3<Long, Long, Long>(2L, 3L, 23L));
+               tuples.add(new Tuple3<Long, Long, Long>(3L, 4L, 34L));
+               tuples.add(new Tuple3<Long, Long, Long>(3L, 6L, 36L));
+               tuples.add(new Tuple3<Long, Long, Long>(4L, 6L, 46L));
+               tuples.add(new Tuple3<Long, Long, Long>(6L, 1L, 61L));
+
+               return env.fromCollection(tuples);
+       }
+
        public static final DataSet<Tuple2<Long, 
DummyCustomParameterizedType<Float>>> getLongCustomTuple2Data(
                        ExecutionEnvironment env) {
                List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples 
= new ArrayList<Tuple2<Long,
@@ -77,6 +120,53 @@ public class TestGraphUtils {
                return env.fromCollection(tuples);
        }
 
+       public static final DataSet<Tuple2<Long, 
DummyCustomParameterizedType<Float>>> getLongCustomTuple2SourceData(
+                       ExecutionEnvironment env) {
+               List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples 
= new ArrayList<Tuple2<Long,
+                               DummyCustomParameterizedType<Float>>>();
+               tuples.add(new Tuple2<Long, 
DummyCustomParameterizedType<Float>>(1L,
+                               new DummyCustomParameterizedType<Float>(10, 
10f)));
+               tuples.add(new Tuple2<Long, 
DummyCustomParameterizedType<Float>>(1L,
+                               new DummyCustomParameterizedType<Float>(20, 
20f)));
+               tuples.add(new Tuple2<Long, 
DummyCustomParameterizedType<Float>>(2L,
+                               new DummyCustomParameterizedType<Float>(30, 
30f)));
+               tuples.add(new Tuple2<Long, 
DummyCustomParameterizedType<Float>>(3L,
+                               new DummyCustomParameterizedType<Float>(40, 
40f)));
+
+               return env.fromCollection(tuples);
+       }
+
+       public static final DataSet<Tuple2<Long, 
DummyCustomParameterizedType<Float>>> getLongCustomTuple2TargetData(
+                       ExecutionEnvironment env) {
+               List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples 
= new ArrayList<Tuple2<Long,
+                               DummyCustomParameterizedType<Float>>>();
+               tuples.add(new Tuple2<Long, 
DummyCustomParameterizedType<Float>>(2L,
+                               new DummyCustomParameterizedType<Float>(10, 
10f)));
+               tuples.add(new Tuple2<Long, 
DummyCustomParameterizedType<Float>>(3L,
+                               new DummyCustomParameterizedType<Float>(20, 
20f)));
+               tuples.add(new Tuple2<Long, 
DummyCustomParameterizedType<Float>>(3L,
+                               new DummyCustomParameterizedType<Float>(30, 
30f)));
+               tuples.add(new Tuple2<Long, 
DummyCustomParameterizedType<Float>>(4L,
+                               new DummyCustomParameterizedType<Float>(40, 
40f)));
+
+               return env.fromCollection(tuples);
+       }
+
+       public static final DataSet<Tuple3<Long, Long, 
DummyCustomParameterizedType<Float>>> getLongLongCustomTuple3Data(
+                       ExecutionEnvironment env) {
+               List<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>> 
tuples = new ArrayList<>();
+               tuples.add(new Tuple3<Long, Long, 
DummyCustomParameterizedType<Float>>(1L, 2L,
+                               new DummyCustomParameterizedType<Float>(10, 
10f)));
+               tuples.add(new Tuple3<Long, Long, 
DummyCustomParameterizedType<Float>>(1L, 3L,
+                               new DummyCustomParameterizedType<Float>(20, 
20f)));
+               tuples.add(new Tuple3<Long, Long, 
DummyCustomParameterizedType<Float>>(2L, 3L,
+                               new DummyCustomParameterizedType<Float>(30, 
30f)));
+               tuples.add(new Tuple3<Long, Long, 
DummyCustomParameterizedType<Float>>(3L, 4L,
+                               new DummyCustomParameterizedType<Float>(40, 
40f)));
+
+               return env.fromCollection(tuples);
+       }
+
        /**
         * A graph with invalid vertex ids
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/e0c10ec8/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
new file mode 100644
index 0000000..711cd61
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
@@ -0,0 +1,584 @@
+package flink.graphs;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+@RunWith(Parameterized.class)
+public class TestJoinWithEdges extends JavaProgramTestBase {
+
+    private static int NUM_PROGRAMS = 15;
+
+    private int curProgId = config.getInteger("ProgramId", -1);
+    private String resultPath;
+    private String expectedResult;
+
+    public TestJoinWithEdges(Configuration config) {
+        super(config);
+    }
+
+    @Override
+    protected void preSubmit() throws Exception {
+        resultPath = getTempDirPath("result");
+    }
+
+    @Override
+    protected void testProgram() throws Exception {
+        expectedResult = GraphProgs.runProgram(curProgId, resultPath);
+    }
+
+    @Override
+    protected void postSubmit() throws Exception {
+        compareResultsByLinesInMemory(expectedResult, resultPath);
+    }
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> getConfigurations() throws IOException {
+
+        LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+        for(int i=1; i <= NUM_PROGRAMS; i++) {
+            Configuration config = new Configuration();
+            config.setInteger("ProgramId", i);
+            tConfigs.add(config);
+        }
+
+        return toParameterList(tConfigs);
+    }
+
+    private static class GraphProgs {
+
+        @SuppressWarnings("serial")
+        public static String runProgram(int progId, String resultPath) throws 
Exception {
+
+            switch (progId) {
+                case 1: {
+                               /*
+                                * Test joinWithEdges with the input DataSet 
parameter identical
+                                * to the edge DataSet
+                                */
+                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = 
graph.joinWithEdges(graph.getEdges()
+                                    .map(new MapFunction<Edge<Long, Long>, 
Tuple3<Long, Long, Long>>() {
+                                        @Override
+                                        public Tuple3<Long, Long, Long> 
map(Edge<Long, Long> edge) throws Exception {
+                                            return new Tuple3<Long, Long, 
Long>(edge.getSource(),
+                                                    edge.getTarget(), 
edge.getValue());
+                                        }
+                                    }),
+                            new MapFunction<Tuple2<Long, Long>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Long> tuple) 
throws Exception {
+                                    return tuple.f0 + tuple.f1;
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,24\n" +
+                            "1,3,26\n" +
+                            "2,3,46\n" +
+                            "3,4,68\n" +
+                            "3,5,70\n" +
+                            "4,5,90\n" +
+                            "5,1,102\n";
+                }
+                case 2: {
+                /*
+                                * Test joinWithEdges with the input DataSet 
passed as a parameter containing
+                                * less elements than the edge DataSet, but of 
the same type
+                                */
+                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = 
graph.joinWithEdges(graph.getEdges().first(3)
+                                    .map(new MapFunction<Edge<Long, Long>, 
Tuple3<Long, Long, Long>>() {
+                                        @Override
+                                        public Tuple3<Long, Long, Long> 
map(Edge<Long, Long> edge) throws Exception {
+                                            return new Tuple3<Long, Long, 
Long>(edge.getSource(),
+                                                    edge.getTarget(), 
edge.getValue());
+                                        }
+                                    }),
+                            new MapFunction<Tuple2<Long, Long>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Long> tuple) 
throws Exception {
+                                    return tuple.f0 + tuple.f1;
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,24\n" +
+                            "1,3,26\n" +
+                            "2,3,46\n" +
+                            "3,4,34\n" +
+                            "3,5,35\n" +
+                            "4,5,45\n" +
+                            "5,1,51\n";
+                }
+                case 3: {
+                /*
+                                * Test joinWithEdges with the input DataSet 
passed as a parameter containing
+                                * less elements than the edge DataSet and of a 
different type(Boolean)
+                                */
+                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = 
graph.joinWithEdges(graph.getEdges().first(3)
+                                    .map(new MapFunction<Edge<Long, Long>, 
Tuple3<Long, Long, Boolean>>() {
+                                        @Override
+                                        public Tuple3<Long, Long, Boolean> 
map(Edge<Long, Long> edge) throws Exception {
+                                            return new Tuple3<Long, Long, 
Boolean>(edge.getSource(),
+                                                    edge.getTarget(), true);
+                                        }
+                                    }),
+                            new MapFunction<Tuple2<Long, Boolean>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Boolean> tuple) 
throws Exception {
+                                    if(tuple.f1) {
+                                        return tuple.f0 * 2;
+                                    }
+                                    else {
+                                        return tuple.f0;
+                                    }
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,24\n" +
+                            "1,3,26\n" +
+                            "2,3,46\n" +
+                            "3,4,34\n" +
+                            "3,5,35\n" +
+                            "4,5,45\n" +
+                            "5,1,51\n";
+                }
+                case 4: {
+                /*
+                                * Test joinWithEdges with the input DataSet 
containing different keys than the edge DataSet
+                                * - the iterator becomes empty.
+                                */
+                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = 
graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env),
+                            new MapFunction<Tuple2<Long, Long>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Long> tuple) 
throws Exception {
+                                    return tuple.f1 * 2;
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,24\n" +
+                            "1,3,26\n" +
+                            "2,3,46\n" +
+                            "3,4,68\n" +
+                            "3,5,35\n" +
+                            "4,5,45\n" +
+                            "5,1,51\n";
+                }
+                case 5: {
+                /*
+                    * Test joinWithEdges with a DataSet containing custom 
parametrised type input values
+                        */
+                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = 
graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env),
+                            new MapFunction<Tuple2<Long, 
TestGraphUtils.DummyCustomParameterizedType<Float>>, Long>() {
+                                public Long map(Tuple2<Long, 
TestGraphUtils.DummyCustomParameterizedType<Float>> tuple) throws Exception {
+                                    return (long) tuple.f1.getIntField();
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,10\n" +
+                            "1,3,20\n" +
+                            "2,3,30\n" +
+                            "3,4,40\n" +
+                            "3,5,35\n" +
+                            "4,5,45\n" +
+                            "5,1,51\n";
+                }
+                case 6: {
+                /*
+                                * Test joinWithEdgesOnSource with the input 
DataSet parameter identical
+                                * to the edge DataSet
+                                */
+                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(graph.getEdges()
+                                    .map(new MapFunction<Edge<Long, Long>, 
Tuple2<Long, Long>>() {
+                                        @Override
+                                        public Tuple2<Long, Long> 
map(Edge<Long, Long> edge) throws Exception {
+                                            return new Tuple2<Long, 
Long>(edge.getSource(), edge.getValue());
+                                        }
+                                    }),
+                            new MapFunction<Tuple2<Long, Long>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Long> tuple) 
throws Exception {
+                                    return tuple.f0 + tuple.f1;
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,24\n" +
+                            "1,3,25\n" +
+                            "2,3,46\n" +
+                            "3,4,68\n" +
+                            "3,5,69\n" +
+                            "4,5,90\n" +
+                            "5,1,102\n";
+                }
+                case 7: {
+                /*
+                                * Test joinWithEdgesOnSource with the input 
DataSet passed as a parameter containing
+                                * less elements than the edge DataSet, but of 
the same type
+                                */
+                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(graph.getEdges().first(3)
+                                    .map(new MapFunction<Edge<Long, Long>, 
Tuple2<Long, Long>>() {
+                                        @Override
+                                        public Tuple2<Long, Long> 
map(Edge<Long, Long> edge) throws Exception {
+                                            return new Tuple2<Long, 
Long>(edge.getSource(), edge.getValue());
+                                        }
+                                    }),
+                            new MapFunction<Tuple2<Long, Long>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Long> tuple) 
throws Exception {
+                                    return tuple.f0 + tuple.f1;
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,24\n" +
+                            "1,3,25\n" +
+                            "2,3,46\n" +
+                            "3,4,34\n" +
+                            "3,5,35\n" +
+                            "4,5,45\n" +
+                            "5,1,51\n";
+                }
+                case 8: {
+                /*
+                                * Test joinWithEdgesOnSource with the input 
DataSet passed as a parameter containing
+                                * less elements than the edge DataSet and of a 
different type(Boolean)
+                                */
+                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(graph.getEdges().first(3)
+                                    .map(new MapFunction<Edge<Long, Long>, 
Tuple2<Long, Boolean>>() {
+                                        @Override
+                                        public Tuple2<Long, Boolean> 
map(Edge<Long, Long> edge) throws Exception {
+                                            return new Tuple2<Long, 
Boolean>(edge.getSource(), true);
+                                        }
+                                    }),
+                            new MapFunction<Tuple2<Long, Boolean>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Boolean> tuple) 
throws Exception {
+                                    if (tuple.f1) {
+                                        return tuple.f0 * 2;
+                                    } else {
+                                        return tuple.f0;
+                                    }
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,24\n" +
+                            "1,3,26\n" +
+                            "2,3,46\n" +
+                            "3,4,34\n" +
+                            "3,5,35\n" +
+                            "4,5,45\n" +
+                            "5,1,51\n";
+                }
+                case 9: {
+                /*
+                                * Test joinWithEdgesOnSource with the input 
DataSet containing different keys than the edge DataSet
+                                * - the iterator becomes empty.
+                                */
+                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env),
+                            new MapFunction<Tuple2<Long, Long>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Long> tuple) 
throws Exception {
+                                    return tuple.f1 * 2;
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,20\n" +
+                            "1,3,20\n" +
+                            "2,3,60\n" +
+                            "3,4,80\n" +
+                            "3,5,80\n" +
+                            "4,5,120\n" +
+                            "5,1,51\n";
+                }
+                case 10: {
+                /*
+                    * Test joinWithEdgesOnSource with a DataSet containing 
custom parametrised type input values
+                        */
+                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env),
+                            new MapFunction<Tuple2<Long, 
TestGraphUtils.DummyCustomParameterizedType<Float>>, Long>() {
+                                public Long map(Tuple2<Long, 
TestGraphUtils.DummyCustomParameterizedType<Float>> tuple) throws Exception {
+                                    return (long) tuple.f1.getIntField();
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,10\n" +
+                            "1,3,10\n" +
+                            "2,3,30\n" +
+                            "3,4,40\n" +
+                            "3,5,40\n" +
+                            "4,5,45\n" +
+                            "5,1,51\n";
+                }
+                case 11: {
+                /*
+                                * Test joinWithEdgesOnTarget with the input 
DataSet parameter identical
+                                * to the edge DataSet
+                                */
+                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(graph.getEdges()
+                                    .map(new MapFunction<Edge<Long, Long>, 
Tuple2<Long, Long>>() {
+                                        @Override
+                                        public Tuple2<Long, Long> 
map(Edge<Long, Long> edge) throws Exception {
+                                            return new Tuple2<Long, 
Long>(edge.getTarget(), edge.getValue());
+                                        }
+                                    }),
+                            new MapFunction<Tuple2<Long, Long>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Long> tuple) 
throws Exception {
+                                    return tuple.f0 + tuple.f1;
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,24\n" +
+                            "1,3,26\n" +
+                            "2,3,36\n" +
+                            "3,4,68\n" +
+                            "3,5,70\n" +
+                            "4,5,80\n" +
+                            "5,1,102\n";
+                }
+                case 12: {
+                /*
+                                * Test joinWithEdgesOnTarget with the input 
DataSet passed as a parameter containing
+                                * less elements than the edge DataSet, but of 
the same type
+                                */
+                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
+                                    .map(new MapFunction<Edge<Long, Long>, 
Tuple2<Long, Long>>() {
+                                        @Override
+                                        public Tuple2<Long, Long> 
map(Edge<Long, Long> edge) throws Exception {
+                                            return new Tuple2<Long, 
Long>(edge.getTarget(), edge.getValue());
+                                        }
+                                    }),
+                            new MapFunction<Tuple2<Long, Long>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Long> tuple) 
throws Exception {
+                                    return tuple.f0 + tuple.f1;
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,24\n" +
+                            "1,3,26\n" +
+                            "2,3,36\n" +
+                            "3,4,34\n" +
+                            "3,5,35\n" +
+                            "4,5,45\n" +
+                            "5,1,51\n";
+                }
+                case 13: {
+                /*
+                                * Test joinWithEdgesOnTarget with the input 
DataSet passed as a parameter containing
+                                * less elements than the edge DataSet and of a 
different type(Boolean)
+                                */
+                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
+                                    .map(new MapFunction<Edge<Long, Long>, 
Tuple2<Long, Boolean>>() {
+                                        @Override
+                                        public Tuple2<Long, Boolean> 
map(Edge<Long, Long> edge) throws Exception {
+                                            return new Tuple2<Long, 
Boolean>(edge.getTarget(), true);
+                                        }
+                                    }),
+                            new MapFunction<Tuple2<Long, Boolean>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Boolean> tuple) 
throws Exception {
+                                    if (tuple.f1) {
+                                        return tuple.f0 * 2;
+                                    } else {
+                                        return tuple.f0;
+                                    }
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,24\n" +
+                            "1,3,26\n" +
+                            "2,3,46\n" +
+                            "3,4,34\n" +
+                            "3,5,35\n" +
+                            "4,5,45\n" +
+                            "5,1,51\n";
+                }
+                case 14: {
+                /*
+                                * Test joinWithEdgesOnTarget with the input 
DataSet containing different keys than the edge DataSet
+                                * - the iterator becomes empty.
+                                */
+                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env),
+                            new MapFunction<Tuple2<Long, Long>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Long> tuple) 
throws Exception {
+                                    return tuple.f1 * 2;
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,20\n" +
+                            "1,3,40\n" +
+                            "2,3,40\n" +
+                            "3,4,80\n" +
+                            "3,5,35\n" +
+                            "4,5,45\n" +
+                            "5,1,140\n";
+                }
+                case 15: {
+                /*
+                    * Test joinWithEdgesOnTarget with a DataSet containing 
custom parametrised type input values
+                        */
+                    final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = 
Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env),
+                            new MapFunction<Tuple2<Long, 
TestGraphUtils.DummyCustomParameterizedType<Float>>, Long>() {
+                                public Long map(Tuple2<Long, 
TestGraphUtils.DummyCustomParameterizedType<Float>> tuple) throws Exception {
+                                    return (long) tuple.f1.getIntField();
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,10\n" +
+                            "1,3,20\n" +
+                            "2,3,20\n" +
+                            "3,4,40\n" +
+                            "3,5,35\n" +
+                            "4,5,45\n" +
+                            "5,1,51\n";
+                }
+                default:
+                    throw new IllegalArgumentException("Invalid program id");
+            }
+        }
+    }
+}
+

Reply via email to