[FLINK-1201] [gelly] joinWithVertices implemented and tested
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f600098 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f600098 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f600098 Branch: refs/heads/master Commit: 3f6000986f7c64a1f4b3cafff36639d3564330a1 Parents: 99544bd Author: andralungu <lungu.an...@gmail.com> Authored: Tue Dec 23 22:35:40 2014 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 10:46:13 2015 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/flink/graph/Graph.java | 46 +++- .../apache/flink/graph/test/TestGraphUtils.java | 28 +++ .../flink/graph/test/TestJoinWithVertices.java | 228 +++++++++++++++++++ 3 files changed, 301 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3f600098/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 85717d8..948256e 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -179,7 +179,51 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } } - /** + /** + * Method that joins the vertex DataSet with an input DataSet and applies a UDF on the resulted values. + * @param inputDataSet + * @param mapper - the UDF applied + * @return - a new graph where the vertex values have been updated. + */ + public <T> Graph<K, VV, EV> joinWithVertices(DataSet<Tuple2<K, T>> inputDataSet, + final MapFunction<Tuple2<VV, T>, VV> mapper) { + + DataSet<Vertex<K, VV>> resultedVertices = this.getVertices() + .coGroup(inputDataSet).where(0).equalTo(0) + .with(new ApplyCoGroupToVertexValues<K, VV, T>(mapper)); + + return Graph.create(resultedVertices, this.getEdges(), this.getContext()); + } + + private static final class ApplyCoGroupToVertexValues<K extends Comparable<K> & Serializable, + VV extends Serializable, T> + implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, T>, Vertex<K, VV>> { + + private MapFunction<Tuple2<VV, T>, VV> mapper; + + public ApplyCoGroupToVertexValues(MapFunction<Tuple2<VV, T>, VV> mapper) { + this.mapper = mapper; + } + + @Override + public void coGroup(Iterable<Vertex<K, VV>> iterableDS1, Iterable<Tuple2<K, T>> iterableDS2, + Collector<Vertex<K, VV>> collector) throws Exception { + + Iterator<Vertex<K, VV>> iteratorDS1 = iterableDS1.iterator(); + Iterator<Tuple2<K, T>> iteratorDS2 = iterableDS2.iterator(); + + if(iteratorDS2.hasNext() && iteratorDS1.hasNext()) { + Tuple2<K, T> iteratorDS2Next = iteratorDS2.next(); + + collector.collect(new Vertex<K, VV>(iteratorDS2Next.f0, mapper + .map(new Tuple2<VV, T>(iteratorDS1.next().f1, iteratorDS2Next.f1)))); + } else if(iteratorDS1.hasNext()) { + collector.collect(iteratorDS1.next()); + } + } + } + + /** * Apply value-based filtering functions to the graph * and return a sub-graph that satisfies the predicates * for both vertex values and edge values. http://git-wip-us.apache.org/repos/asf/flink/blob/3f600098/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java index ac2adc6..11e43e7 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java @@ -6,6 +6,7 @@ import java.util.List; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; public class TestGraphUtils { @@ -35,6 +36,33 @@ public class TestGraphUtils { return env.fromCollection(edges); } + public static final DataSet<Tuple2<Long, Long>> getLongLongTuple2Data( + ExecutionEnvironment env) { + List<Tuple2<Long, Long>> tuples = new ArrayList<Tuple2<Long, Long>>(); + tuples.add(new Tuple2<Long, Long>(1L, 10L)); + tuples.add(new Tuple2<Long, Long>(2L, 20L)); + tuples.add(new Tuple2<Long, Long>(3L, 30L)); + tuples.add(new Tuple2<Long, Long>(4L, 40L)); + tuples.add(new Tuple2<Long, Long>(6L, 60L)); + + return env.fromCollection(tuples); + } + + public static final DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2Data( + ExecutionEnvironment env) { + List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<Tuple2<Long, + DummyCustomParameterizedType<Float>>>(); + tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(1L, + new DummyCustomParameterizedType<Float>(10, 10f))); + tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(2L, + new DummyCustomParameterizedType<Float>(20, 20f))); + tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L, + new DummyCustomParameterizedType<Float>(30, 30f))); + tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(4L, + new DummyCustomParameterizedType<Float>(40, 40f))); + return env.fromCollection(tuples); + } + /** * A graph with invalid vertex ids */ http://git-wip-us.apache.org/repos/asf/flink/blob/3f600098/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java new file mode 100644 index 0000000..d2b7ba6 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java @@ -0,0 +1,228 @@ +package flink.graphs; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import flink.graphs.TestGraphUtils.DummyCustomParameterizedType; + +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; + +@RunWith(Parameterized.class) +public class TestJoinWithVertices extends JavaProgramTestBase { + + private static int NUM_PROGRAMS = 5; + + private int curProgId = config.getInteger("ProgramId", -1); + private String resultPath; + private String expectedResult; + + public TestJoinWithVertices(Configuration config) { + super(config); + } + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void testProgram() throws Exception { + expectedResult = GraphProgs.runProgram(curProgId, resultPath); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Parameterized.Parameters + public static Collection<Object[]> getConfigurations() throws IOException { + + LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + + for(int i=1; i <= NUM_PROGRAMS; i++) { + Configuration config = new Configuration(); + config.setInteger("ProgramId", i); + tConfigs.add(config); + } + + return toParameterList(tConfigs); + } + + private static class GraphProgs { + + @SuppressWarnings("serial") + public static String runProgram(int progId, String resultPath) throws Exception { + + switch (progId) { + case 1: { + /* + * Test joinWithVertices with the input DataSet parameter identical + * to the vertex DataSet + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices() + .map(new MapFunction<Vertex<Long, Long>, Tuple2<Long, Long>>() { + @Override + public Tuple2<Long, Long> map(Vertex<Long, Long> vertex) throws Exception { + return new Tuple2<Long, Long>(vertex.getId(), vertex.getValue()); + } + }), + new MapFunction<Tuple2<Long, Long>, Long>() { + + @Override + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f0 + tuple.f1; + } + }); + + result.getVertices().writeAsCsv(resultPath); + env.execute(); + + return "1,2\n" + + "2,4\n" + + "3,6\n" + + "4,8\n" + + "5,10\n"; + } + case 2: { + /* + * Test joinWithVertices with the input DataSet passed as a parameter containing + * less elements than the vertex DataSet, but of the same type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3) + .map(new MapFunction<Vertex<Long, Long>, Tuple2<Long, Long>>() { + @Override + public Tuple2<Long, Long> map(Vertex<Long, Long> vertex) throws Exception { + return new Tuple2<Long, Long>(vertex.getId(), vertex.getValue()); + } + }), + new MapFunction<Tuple2<Long, Long>, Long>() { + + @Override + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f0 + tuple.f1; + } + }); + + result.getVertices().writeAsCsv(resultPath); + env.execute(); + + return "1,2\n" + + "2,4\n" + + "3,6\n" + + "4,4\n" + + "5,5\n"; + } + case 3: { + /* + * Test joinWithVertices with the input DataSet passed as a parameter containing + * less elements than the vertex DataSet and of a different type(Boolean) + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3) + .map(new MapFunction<Vertex<Long, Long>, Tuple2<Long, Boolean>>() { + @Override + public Tuple2<Long, Boolean> map(Vertex<Long, Long> vertex) throws Exception { + return new Tuple2<Long, Boolean>(vertex.getId(), true); + } + }), + new MapFunction<Tuple2<Long, Boolean>, Long>() { + + @Override + public Long map(Tuple2<Long, Boolean> tuple) throws Exception { + if(tuple.f1) { + return tuple.f0 * 2; + } + else { + return tuple.f0; + } + } + }); + + result.getVertices().writeAsCsv(resultPath); + env.execute(); + + return "1,2\n" + + "2,4\n" + + "3,6\n" + + "4,4\n" + + "5,5\n"; + } + case 4: { + /* + * Test joinWithVertices with an input DataSet containing different keys than the vertex DataSet + * - the iterator becomes empty. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongLongTuple2Data(env), + new MapFunction<Tuple2<Long, Long>, Long>() { + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f1; + } + }); + + result.getVertices().writeAsCsv(resultPath); + env.execute(); + + return "1,10\n" + + "2,20\n" + + "3,30\n" + + "4,40\n" + + "5,5\n"; + } + case 5: { + /* + * Test joinWithVertices with a DataSet containing custom parametrised type input values + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongCustomTuple2Data(env), + new MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long>() { + public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception { + return (long) tuple.f1.getIntField(); + } + }); + + result.getVertices().writeAsCsv(resultPath); + env.execute(); + + return "1,10\n" + + "2,20\n" + + "3,30\n" + + "4,40\n" + + "5,5\n"; + } + default: + throw new IllegalArgumentException("Invalid program id"); + } + } + } +}