[FLINK-1201] [gelly] joinWithEdges implemented and tested
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0c10ec8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0c10ec8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0c10ec8 Branch: refs/heads/master Commit: e0c10ec8f8aff10eaa186d04dda252e105b4eb0b Parents: b2c89cc Author: andralungu <lungu.an...@gmail.com> Authored: Thu Jan 8 18:16:31 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 10:46:14 2015 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/flink/graph/Graph.java | 123 ++++ .../apache/flink/graph/test/TestGraphUtils.java | 90 +++ .../flink/graph/test/TestJoinWithEdges.java | 584 +++++++++++++++++++ 3 files changed, 797 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e0c10ec8/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 1cd5c90..71a701b 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -222,6 +222,129 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** + * Method that joins the edge DataSet with an input DataSet on a composite key of both source and target + * and applies a UDF on the resulted values. + * @param inputDataSet + * @param mapper - the UDF applied + * @param <T> + * @return - a new graph where the edge values have been updated. + */ + public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>> inputDataSet, + final MapFunction<Tuple2<EV, T>, EV> mapper) { + + DataSet<Edge<K, EV>> resultedEdges = this.getEdges() + .coGroup(inputDataSet).where(0,1).equalTo(0,1) + .with(new ApplyCoGroupToEdgeValues<K, EV, T>(mapper)); + + return Graph.create(this.getVertices(), resultedEdges, this.getContext()); + } + + private static final class ApplyCoGroupToEdgeValues<K extends Comparable<K> & Serializable, + EV extends Serializable, T> + implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, T>, Edge<K, EV>> { + + private MapFunction<Tuple2<EV, T>, EV> mapper; + + public ApplyCoGroupToEdgeValues(MapFunction<Tuple2<EV, T>, EV> mapper) { + this.mapper = mapper; + } + + @Override + public void coGroup(Iterable<Edge<K, EV>> iterableDS1, + Iterable<Tuple3<K, K, T>> iterableDS2, + Collector<Edge<K, EV>> collector) throws Exception { + + Iterator<Edge<K, EV>> iteratorDS1 = iterableDS1.iterator(); + Iterator<Tuple3<K, K, T>> iteratorDS2 = iterableDS2.iterator(); + + if(iteratorDS2.hasNext() && iteratorDS1.hasNext()) { + Tuple3<K, K, T> iteratorDS2Next = iteratorDS2.next(); + + collector.collect(new Edge<K, EV>(iteratorDS2Next.f0, iteratorDS2Next.f1, mapper + .map(new Tuple2<EV, T>(iteratorDS1.next().f2, iteratorDS2Next.f2)))); + + } else if(iteratorDS1.hasNext()) { + collector.collect(iteratorDS1.next()); + } + } + } + + /** + * Method that joins the edge DataSet with an input DataSet on the source key of the edges and the first attribute + * of the input DataSet and applies a UDF on the resulted values. + * Should the inputDataSet contain the same key more than once, only the first value will be considered. + * @param inputDataSet + * @param mapper - the UDF applied + * @param <T> + * @return - a new graph where the edge values have been updated. + */ + public <T> Graph<K, VV, EV> joinWithEdgesOnSource(DataSet<Tuple2<K, T>> inputDataSet, + final MapFunction<Tuple2<EV, T>, EV> mapper) { + + DataSet<Edge<K, EV>> resultedEdges = this.getEdges() + .coGroup(inputDataSet).where(0).equalTo(0) + .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper)); + + return Graph.create(this.getVertices(), resultedEdges, this.getContext()); + } + + private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K extends Comparable<K> & Serializable, + EV extends Serializable, T> + implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>> { + + private MapFunction<Tuple2<EV, T>, EV> mapper; + + public ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(MapFunction<Tuple2<EV, T>, EV> mapper) { + this.mapper = mapper; + } + + + @Override + public void coGroup(Iterable<Edge<K, EV>> iterableDS1, + Iterable<Tuple2<K, T>> iterableDS2, + Collector<Edge<K, EV>> collector) throws Exception { + + Iterator<Edge<K, EV>> iteratorDS1 = iterableDS1.iterator(); + Iterator<Tuple2<K, T>> iteratorDS2 = iterableDS2.iterator(); + + if(iteratorDS2.hasNext()) { + Tuple2<K, T> iteratorDS2Next = iteratorDS2.next(); + + while(iteratorDS1.hasNext()) { + Edge<K, EV> iteratorDS1Next = iteratorDS1.next(); + + collector.collect(new Edge<K, EV>(iteratorDS1Next.f0, iteratorDS1Next.f1, mapper + .map(new Tuple2<EV, T>(iteratorDS1Next.f2, iteratorDS2Next.f1)))); + } + + } else { + while(iteratorDS1.hasNext()) { + collector.collect(iteratorDS1.next()); + } + } + } + } + + /** + * Method that joins the edge DataSet with an input DataSet on the target key of the edges and the first attribute + * of the input DataSet and applies a UDF on the resulted values. + * Should the inputDataSet contain the same key more than once, only the first value will be considered. + * @param inputDataSet + * @param mapper - the UDF applied + * @param <T> + * @return - a new graph where the edge values have been updated. + */ + public <T> Graph<K, VV, EV> joinWithEdgesOnTarget(DataSet<Tuple2<K, T>> inputDataSet, + final MapFunction<Tuple2<EV, T>, EV> mapper) { + + DataSet<Edge<K, EV>> resultedEdges = this.getEdges() + .coGroup(inputDataSet).where(1).equalTo(0) + .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper)); + + return Graph.create(this.getVertices(), resultedEdges, this.getContext()); + } + + /** * Apply value-based filtering functions to the graph * and return a sub-graph that satisfies the predicates * for both vertex values and edge values. http://git-wip-us.apache.org/repos/asf/flink/blob/e0c10ec8/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java index 9816619..d5062c5 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java @@ -7,6 +7,7 @@ import java.util.List; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; public class TestGraphUtils { @@ -62,6 +63,48 @@ public class TestGraphUtils { return env.fromCollection(tuples); } + public static final DataSet<Tuple2<Long, Long>> getLongLongTuple2SourceData( + ExecutionEnvironment env) { + List<Tuple2<Long, Long>> tuples = new ArrayList<Tuple2<Long, Long>>(); + tuples.add(new Tuple2<Long, Long>(1L, 10L)); + tuples.add(new Tuple2<Long, Long>(1L, 20L)); + tuples.add(new Tuple2<Long, Long>(2L, 30L)); + tuples.add(new Tuple2<Long, Long>(3L, 40L)); + tuples.add(new Tuple2<Long, Long>(3L, 50L)); + tuples.add(new Tuple2<Long, Long>(4L, 60L)); + tuples.add(new Tuple2<Long, Long>(6L, 70L)); + + return env.fromCollection(tuples); + } + + public static final DataSet<Tuple2<Long, Long>> getLongLongTuple2TargetData( + ExecutionEnvironment env) { + List<Tuple2<Long, Long>> tuples = new ArrayList<Tuple2<Long, Long>>(); + tuples.add(new Tuple2<Long, Long>(2L, 10L)); + tuples.add(new Tuple2<Long, Long>(3L, 20L)); + tuples.add(new Tuple2<Long, Long>(3L, 30L)); + tuples.add(new Tuple2<Long, Long>(4L, 40L)); + tuples.add(new Tuple2<Long, Long>(6L, 50L)); + tuples.add(new Tuple2<Long, Long>(6L, 60L)); + tuples.add(new Tuple2<Long, Long>(1L, 70L)); + + return env.fromCollection(tuples); + } + + public static final DataSet<Tuple3<Long, Long, Long>> getLongLongLongTuple3Data( + ExecutionEnvironment env) { + List<Tuple3<Long, Long, Long>> tuples = new ArrayList<>(); + tuples.add(new Tuple3<Long, Long, Long>(1L, 2L, 12L)); + tuples.add(new Tuple3<Long, Long, Long>(1L, 3L, 13L)); + tuples.add(new Tuple3<Long, Long, Long>(2L, 3L, 23L)); + tuples.add(new Tuple3<Long, Long, Long>(3L, 4L, 34L)); + tuples.add(new Tuple3<Long, Long, Long>(3L, 6L, 36L)); + tuples.add(new Tuple3<Long, Long, Long>(4L, 6L, 46L)); + tuples.add(new Tuple3<Long, Long, Long>(6L, 1L, 61L)); + + return env.fromCollection(tuples); + } + public static final DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2Data( ExecutionEnvironment env) { List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<Tuple2<Long, @@ -77,6 +120,53 @@ public class TestGraphUtils { return env.fromCollection(tuples); } + public static final DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2SourceData( + ExecutionEnvironment env) { + List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<Tuple2<Long, + DummyCustomParameterizedType<Float>>>(); + tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(1L, + new DummyCustomParameterizedType<Float>(10, 10f))); + tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(1L, + new DummyCustomParameterizedType<Float>(20, 20f))); + tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(2L, + new DummyCustomParameterizedType<Float>(30, 30f))); + tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L, + new DummyCustomParameterizedType<Float>(40, 40f))); + + return env.fromCollection(tuples); + } + + public static final DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2TargetData( + ExecutionEnvironment env) { + List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<Tuple2<Long, + DummyCustomParameterizedType<Float>>>(); + tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(2L, + new DummyCustomParameterizedType<Float>(10, 10f))); + tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L, + new DummyCustomParameterizedType<Float>(20, 20f))); + tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L, + new DummyCustomParameterizedType<Float>(30, 30f))); + tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(4L, + new DummyCustomParameterizedType<Float>(40, 40f))); + + return env.fromCollection(tuples); + } + + public static final DataSet<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>> getLongLongCustomTuple3Data( + ExecutionEnvironment env) { + List<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<>(); + tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(1L, 2L, + new DummyCustomParameterizedType<Float>(10, 10f))); + tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(1L, 3L, + new DummyCustomParameterizedType<Float>(20, 20f))); + tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(2L, 3L, + new DummyCustomParameterizedType<Float>(30, 30f))); + tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(3L, 4L, + new DummyCustomParameterizedType<Float>(40, 40f))); + + return env.fromCollection(tuples); + } + /** * A graph with invalid vertex ids */ http://git-wip-us.apache.org/repos/asf/flink/blob/e0c10ec8/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java new file mode 100644 index 0000000..711cd61 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java @@ -0,0 +1,584 @@ +package flink.graphs; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; + +@RunWith(Parameterized.class) +public class TestJoinWithEdges extends JavaProgramTestBase { + + private static int NUM_PROGRAMS = 15; + + private int curProgId = config.getInteger("ProgramId", -1); + private String resultPath; + private String expectedResult; + + public TestJoinWithEdges(Configuration config) { + super(config); + } + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void testProgram() throws Exception { + expectedResult = GraphProgs.runProgram(curProgId, resultPath); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Parameterized.Parameters + public static Collection<Object[]> getConfigurations() throws IOException { + + LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + + for(int i=1; i <= NUM_PROGRAMS; i++) { + Configuration config = new Configuration(); + config.setInteger("ProgramId", i); + tConfigs.add(config); + } + + return toParameterList(tConfigs); + } + + private static class GraphProgs { + + @SuppressWarnings("serial") + public static String runProgram(int progId, String resultPath) throws Exception { + + switch (progId) { + case 1: { + /* + * Test joinWithEdges with the input DataSet parameter identical + * to the edge DataSet + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges() + .map(new MapFunction<Edge<Long, Long>, Tuple3<Long, Long, Long>>() { + @Override + public Tuple3<Long, Long, Long> map(Edge<Long, Long> edge) throws Exception { + return new Tuple3<Long, Long, Long>(edge.getSource(), + edge.getTarget(), edge.getValue()); + } + }), + new MapFunction<Tuple2<Long, Long>, Long>() { + + @Override + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f0 + tuple.f1; + } + }); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + return "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,68\n" + + "3,5,70\n" + + "4,5,90\n" + + "5,1,102\n"; + } + case 2: { + /* + * Test joinWithEdges with the input DataSet passed as a parameter containing + * less elements than the edge DataSet, but of the same type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3) + .map(new MapFunction<Edge<Long, Long>, Tuple3<Long, Long, Long>>() { + @Override + public Tuple3<Long, Long, Long> map(Edge<Long, Long> edge) throws Exception { + return new Tuple3<Long, Long, Long>(edge.getSource(), + edge.getTarget(), edge.getValue()); + } + }), + new MapFunction<Tuple2<Long, Long>, Long>() { + + @Override + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f0 + tuple.f1; + } + }); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + return "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + case 3: { + /* + * Test joinWithEdges with the input DataSet passed as a parameter containing + * less elements than the edge DataSet and of a different type(Boolean) + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3) + .map(new MapFunction<Edge<Long, Long>, Tuple3<Long, Long, Boolean>>() { + @Override + public Tuple3<Long, Long, Boolean> map(Edge<Long, Long> edge) throws Exception { + return new Tuple3<Long, Long, Boolean>(edge.getSource(), + edge.getTarget(), true); + } + }), + new MapFunction<Tuple2<Long, Boolean>, Long>() { + + @Override + public Long map(Tuple2<Long, Boolean> tuple) throws Exception { + if(tuple.f1) { + return tuple.f0 * 2; + } + else { + return tuple.f0; + } + } + }); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + return "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + case 4: { + /* + * Test joinWithEdges with the input DataSet containing different keys than the edge DataSet + * - the iterator becomes empty. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env), + new MapFunction<Tuple2<Long, Long>, Long>() { + + @Override + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f1 * 2; + } + }); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + return "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,68\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + case 5: { + /* + * Test joinWithEdges with a DataSet containing custom parametrised type input values + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env), + new MapFunction<Tuple2<Long, TestGraphUtils.DummyCustomParameterizedType<Float>>, Long>() { + public Long map(Tuple2<Long, TestGraphUtils.DummyCustomParameterizedType<Float>> tuple) throws Exception { + return (long) tuple.f1.getIntField(); + } + }); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + return "1,2,10\n" + + "1,3,20\n" + + "2,3,30\n" + + "3,4,40\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + case 6: { + /* + * Test joinWithEdgesOnSource with the input DataSet parameter identical + * to the edge DataSet + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges() + .map(new MapFunction<Edge<Long, Long>, Tuple2<Long, Long>>() { + @Override + public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception { + return new Tuple2<Long, Long>(edge.getSource(), edge.getValue()); + } + }), + new MapFunction<Tuple2<Long, Long>, Long>() { + + @Override + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f0 + tuple.f1; + } + }); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + return "1,2,24\n" + + "1,3,25\n" + + "2,3,46\n" + + "3,4,68\n" + + "3,5,69\n" + + "4,5,90\n" + + "5,1,102\n"; + } + case 7: { + /* + * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing + * less elements than the edge DataSet, but of the same type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3) + .map(new MapFunction<Edge<Long, Long>, Tuple2<Long, Long>>() { + @Override + public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception { + return new Tuple2<Long, Long>(edge.getSource(), edge.getValue()); + } + }), + new MapFunction<Tuple2<Long, Long>, Long>() { + + @Override + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f0 + tuple.f1; + } + }); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + return "1,2,24\n" + + "1,3,25\n" + + "2,3,46\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + case 8: { + /* + * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing + * less elements than the edge DataSet and of a different type(Boolean) + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3) + .map(new MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>>() { + @Override + public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception { + return new Tuple2<Long, Boolean>(edge.getSource(), true); + } + }), + new MapFunction<Tuple2<Long, Boolean>, Long>() { + + @Override + public Long map(Tuple2<Long, Boolean> tuple) throws Exception { + if (tuple.f1) { + return tuple.f0 * 2; + } else { + return tuple.f0; + } + } + }); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + return "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + case 9: { + /* + * Test joinWithEdgesOnSource with the input DataSet containing different keys than the edge DataSet + * - the iterator becomes empty. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env), + new MapFunction<Tuple2<Long, Long>, Long>() { + + @Override + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f1 * 2; + } + }); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + return "1,2,20\n" + + "1,3,20\n" + + "2,3,60\n" + + "3,4,80\n" + + "3,5,80\n" + + "4,5,120\n" + + "5,1,51\n"; + } + case 10: { + /* + * Test joinWithEdgesOnSource with a DataSet containing custom parametrised type input values + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env), + new MapFunction<Tuple2<Long, TestGraphUtils.DummyCustomParameterizedType<Float>>, Long>() { + public Long map(Tuple2<Long, TestGraphUtils.DummyCustomParameterizedType<Float>> tuple) throws Exception { + return (long) tuple.f1.getIntField(); + } + }); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + return "1,2,10\n" + + "1,3,10\n" + + "2,3,30\n" + + "3,4,40\n" + + "3,5,40\n" + + "4,5,45\n" + + "5,1,51\n"; + } + case 11: { + /* + * Test joinWithEdgesOnTarget with the input DataSet parameter identical + * to the edge DataSet + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges() + .map(new MapFunction<Edge<Long, Long>, Tuple2<Long, Long>>() { + @Override + public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception { + return new Tuple2<Long, Long>(edge.getTarget(), edge.getValue()); + } + }), + new MapFunction<Tuple2<Long, Long>, Long>() { + + @Override + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f0 + tuple.f1; + } + }); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + return "1,2,24\n" + + "1,3,26\n" + + "2,3,36\n" + + "3,4,68\n" + + "3,5,70\n" + + "4,5,80\n" + + "5,1,102\n"; + } + case 12: { + /* + * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing + * less elements than the edge DataSet, but of the same type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3) + .map(new MapFunction<Edge<Long, Long>, Tuple2<Long, Long>>() { + @Override + public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception { + return new Tuple2<Long, Long>(edge.getTarget(), edge.getValue()); + } + }), + new MapFunction<Tuple2<Long, Long>, Long>() { + + @Override + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f0 + tuple.f1; + } + }); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + return "1,2,24\n" + + "1,3,26\n" + + "2,3,36\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + case 13: { + /* + * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing + * less elements than the edge DataSet and of a different type(Boolean) + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3) + .map(new MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>>() { + @Override + public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception { + return new Tuple2<Long, Boolean>(edge.getTarget(), true); + } + }), + new MapFunction<Tuple2<Long, Boolean>, Long>() { + + @Override + public Long map(Tuple2<Long, Boolean> tuple) throws Exception { + if (tuple.f1) { + return tuple.f0 * 2; + } else { + return tuple.f0; + } + } + }); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + return "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + case 14: { + /* + * Test joinWithEdgesOnTarget with the input DataSet containing different keys than the edge DataSet + * - the iterator becomes empty. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env), + new MapFunction<Tuple2<Long, Long>, Long>() { + + @Override + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f1 * 2; + } + }); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + return "1,2,20\n" + + "1,3,40\n" + + "2,3,40\n" + + "3,4,80\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,140\n"; + } + case 15: { + /* + * Test joinWithEdgesOnTarget with a DataSet containing custom parametrised type input values + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env), + new MapFunction<Tuple2<Long, TestGraphUtils.DummyCustomParameterizedType<Float>>, Long>() { + public Long map(Tuple2<Long, TestGraphUtils.DummyCustomParameterizedType<Float>> tuple) throws Exception { + return (long) tuple.f1.getIntField(); + } + }); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + return "1,2,10\n" + + "1,3,20\n" + + "2,3,20\n" + + "3,4,40\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + default: + throw new IllegalArgumentException("Invalid program id"); + } + } + } +} +