http://git-wip-us.apache.org/repos/asf/flink/blob/e6b9cecd/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java index 31bd48b..c4d44b0 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java @@ -18,219 +18,216 @@ package org.apache.flink.graph.test; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Collection; -import java.util.LinkedList; - import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType; -import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) -public class TestMapVertices extends JavaProgramTestBase { +public class TestMapVertices extends MultipleProgramsTestBase { - private static int NUM_PROGRAMS = 5; - - private int curProgId = config.getInteger("ProgramId", -1); - private String resultPath; - private String expectedResult; - - public TestMapVertices(Configuration config) { - super(config); - } - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); + public TestMapVertices(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); } - @Override - protected void testProgram() throws Exception { - expectedResult = GraphProgs.runProgram(curProgId, resultPath); + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); } - - @Override - protected void postSubmit() throws Exception { + + @After + public void after() throws Exception{ compareResultsByLinesInMemory(expectedResult, resultPath); } - - @Parameters - public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { - LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + @Test + public void testWithSameValue() throws Exception { + /* + * Test mapVertices() keeping the same value type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Vertex<Long, Long>> mappedVertices = graph.mapVertices(new AddOneMapper()).getVertices(); + + mappedVertices.writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2\n" + + "2,3\n" + + "3,4\n" + + "4,5\n" + + "5,6\n"; + } + + @Test + public void testWithStringValue() throws Exception { + /* + * Test mapVertices() and change the value type to String + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Vertex<Long, String>> mappedVertices = graph.mapVertices(new ToStringMapper()).getVertices(); + + mappedVertices.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,one\n" + + "2,two\n" + + "3,three\n" + + "4,four\n" + + "5,five\n"; + } - for(int i=1; i <= NUM_PROGRAMS; i++) { - Configuration config = new Configuration(); - config.setInteger("ProgramId", i); - tConfigs.add(config); - } + @Test + public void testWithtuple1Value() throws Exception { + /* + * Test mapVertices() and change the value type to a Tuple1 + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = graph.mapVertices(new ToTuple1Mapper()).getVertices(); + + mappedVertices.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,(1)\n" + + "2,(2)\n" + + "3,(3)\n" + + "4,(4)\n" + + "5,(5)\n"; + } + + @Test + public void testWithCustomType() throws Exception { + /* + * Test mapVertices() and change the value type to a custom type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - return toParameterList(tConfigs); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Vertex<Long, DummyCustomType>> mappedVertices = graph.mapVertices(new ToCustomTypeMapper()).getVertices(); + + mappedVertices.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,(T,1)\n" + + "2,(T,2)\n" + + "3,(T,3)\n" + + "4,(T,4)\n" + + "5,(T,5)\n"; } + + @Test + public void testWithCustomParametrizedType() throws Exception { + /* + * Test mapVertices() and change the value type to a parameterized custom type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Vertex<Long, DummyCustomParameterizedType<Double>>> mappedVertices = graph.mapVertices( + new ToCustomParametrizedTypeMapper()).getVertices(); + + mappedVertices.writeAsCsv(resultPath); + env.execute(); - private static class GraphProgs { - - @SuppressWarnings("serial") - public static String runProgram(int progId, String resultPath) throws Exception { - - switch(progId) { - case 1: { - /* - * Test mapVertices() keeping the same value type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Vertex<Long, Long>> mappedVertices = graph.mapVertices(new MapFunction<Vertex<Long, Long>, Long>() { - public Long map(Vertex<Long, Long> value) throws Exception { - return value.getValue()+1; - } - }).getVertices(); - - mappedVertices.writeAsCsv(resultPath); - env.execute(); - return "1,2\n" + - "2,3\n" + - "3,4\n" + - "4,5\n" + - "5,6\n"; + expectedResult = "1,(1.0,1)\n" + + "2,(2.0,2)\n" + + "3,(3.0,3)\n" + + "4,(4.0,4)\n" + + "5,(5.0,5)\n"; + } + + @SuppressWarnings("serial") + private static final class AddOneMapper implements MapFunction<Vertex<Long, Long>, Long> { + public Long map(Vertex<Long, Long> value) throws Exception { + return value.getValue()+1; + } + } + + @SuppressWarnings("serial") + private static final class ToStringMapper implements MapFunction<Vertex<Long, Long>, String> { + public String map(Vertex<Long, Long> vertex) throws Exception { + String stringValue; + if (vertex.getValue() == 1) { + stringValue = "one"; } - case 2: { - /* - * Test mapVertices() and change the value type to String - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Vertex<Long, String>> mappedVertices = graph.mapVertices(new MapFunction<Vertex<Long, Long>, String>() { - public String map(Vertex<Long, Long> vertex) throws Exception { - String stringValue; - if (vertex.getValue() == 1) { - stringValue = "one"; - } - else if (vertex.getValue() == 2) { - stringValue = "two"; - } - else if (vertex.getValue() == 3) { - stringValue = "three"; - } - else if (vertex.getValue() == 4) { - stringValue = "four"; - } - else if (vertex.getValue() == 5) { - stringValue = "five"; - } - else { - stringValue = ""; - } - return stringValue; - } - }).getVertices(); - - mappedVertices.writeAsCsv(resultPath); - env.execute(); - return "1,one\n" + - "2,two\n" + - "3,three\n" + - "4,four\n" + - "5,five\n"; + else if (vertex.getValue() == 2) { + stringValue = "two"; } - case 3: { - /* - * Test mapVertices() and change the value type to a Tuple1 - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = graph.mapVertices(new MapFunction<Vertex<Long, Long>, Tuple1<Long>>() { - public Tuple1<Long> map(Vertex<Long, Long> vertex) throws Exception { - Tuple1<Long> tupleValue = new Tuple1<Long>(); - tupleValue.setFields(vertex.getValue()); - return tupleValue; - } - }).getVertices(); - - mappedVertices.writeAsCsv(resultPath); - env.execute(); - return "1,(1)\n" + - "2,(2)\n" + - "3,(3)\n" + - "4,(4)\n" + - "5,(5)\n"; + else if (vertex.getValue() == 3) { + stringValue = "three"; } - case 4: { - /* - * Test mapVertices() and change the value type to a custom type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Vertex<Long, DummyCustomType>> mappedVertices = graph.mapVertices(new MapFunction<Vertex<Long, Long>, DummyCustomType>() { - public DummyCustomType map(Vertex<Long, Long> vertex) throws Exception { - DummyCustomType dummyValue = new DummyCustomType(); - dummyValue.setIntField(vertex.getValue().intValue()); - return dummyValue; - } - }).getVertices(); - - mappedVertices.writeAsCsv(resultPath); - env.execute(); - return "1,(T,1)\n" + - "2,(T,2)\n" + - "3,(T,3)\n" + - "4,(T,4)\n" + - "5,(T,5)\n"; + else if (vertex.getValue() == 4) { + stringValue = "four"; } - case 5: { - /* - * Test mapVertices() and change the value type to a parameterized custom type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Vertex<Long, DummyCustomParameterizedType<Double>>> mappedVertices = graph.mapVertices( - new MapFunction<Vertex<Long, Long>, DummyCustomParameterizedType<Double>>() { - public DummyCustomParameterizedType<Double> map(Vertex<Long, Long> vertex) throws Exception { - DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>(); - dummyValue.setIntField(vertex.getValue().intValue()); - dummyValue.setTField(new Double(vertex.getValue())); - return dummyValue; - } - }).getVertices(); - - mappedVertices.writeAsCsv(resultPath); - env.execute(); - return "1,(1.0,1)\n" + - "2,(2.0,2)\n" + - "3,(3.0,3)\n" + - "4,(4.0,4)\n" + - "5,(5.0,5)\n"; + else if (vertex.getValue() == 5) { + stringValue = "five"; } - default: - throw new IllegalArgumentException("Invalid program id"); + else { + stringValue = ""; } + return stringValue; + } + } + + @SuppressWarnings("serial") + private static final class ToTuple1Mapper implements MapFunction<Vertex<Long, Long>, Tuple1<Long>> { + public Tuple1<Long> map(Vertex<Long, Long> vertex) throws Exception { + Tuple1<Long> tupleValue = new Tuple1<Long>(); + tupleValue.setFields(vertex.getValue()); + return tupleValue; } } + @SuppressWarnings("serial") + private static final class ToCustomTypeMapper implements MapFunction<Vertex<Long, Long>, DummyCustomType> { + public DummyCustomType map(Vertex<Long, Long> vertex) throws Exception { + DummyCustomType dummyValue = new DummyCustomType(); + dummyValue.setIntField(vertex.getValue().intValue()); + return dummyValue; + } + } + + @SuppressWarnings("serial") + private static final class ToCustomParametrizedTypeMapper implements MapFunction<Vertex<Long, Long>, + DummyCustomParameterizedType<Double>> { + + public DummyCustomParameterizedType<Double> map(Vertex<Long, Long> vertex) throws Exception { + DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>(); + dummyValue.setIntField(vertex.getValue().intValue()); + dummyValue.setTField(new Double(vertex.getValue())); + return dummyValue; + } + } }
http://git-wip-us.apache.org/repos/asf/flink/blob/e6b9cecd/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java index 89ef7c1..7a02ffe 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java @@ -18,297 +18,300 @@ package org.apache.flink.graph.test; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Collection; -import java.util.LinkedList; - import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.Edge; import org.apache.flink.graph.EdgeDirection; import org.apache.flink.graph.EdgesFunction; import org.apache.flink.graph.EdgesFunctionWithVertexValue; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; -import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) -public class TestReduceOnEdgesMethods extends JavaProgramTestBase { +public class TestReduceOnEdgesMethods extends MultipleProgramsTestBase { - private static int NUM_PROGRAMS = 6; - - private int curProgId = config.getInteger("ProgramId", -1); - private String resultPath; - private String expectedResult; - - public TestReduceOnEdgesMethods(Configuration config) { - super(config); - } - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); + public TestReduceOnEdgesMethods(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); } - @Override - protected void testProgram() throws Exception { - expectedResult = GraphProgs.runProgram(curProgId, resultPath); + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); } - - @Override - protected void postSubmit() throws Exception { + + @After + public void after() throws Exception{ compareResultsByLinesInMemory(expectedResult, resultPath); } + + @Test + public void testLowestWeightOutNeighbor() throws Exception { + /* + * Get the lowest-weight out-neighbor + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = + graph.reduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT); + verticesWithLowestOutNeighbor.writeAsCsv(resultPath); + env.execute(); - @Parameters - public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { + expectedResult = "1,2\n" + + "2,3\n" + + "3,4\n" + + "4,5\n" + + "5,1\n"; + } - LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + @Test + public void testLowestWeightInNeighbor() throws Exception { + /* + * Get the lowest-weight in-neighbor + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); - for(int i=1; i <= NUM_PROGRAMS; i++) { - Configuration config = new Configuration(); - config.setInteger("ProgramId", i); - tConfigs.add(config); - } - - return toParameterList(tConfigs); + DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = + graph.reduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN); + verticesWithLowestOutNeighbor.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,5\n" + + "2,1\n" + + "3,1\n" + + "4,3\n" + + "5,3\n"; } - - private static class GraphProgs { - - @SuppressWarnings("serial") - public static String runProgram(int progId, String resultPath) throws Exception { + + @Test + public void testMaxWeightEdge() throws Exception { + /* + * Get the maximum weight among all edges + * of a vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = + graph.reduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL); + verticesWithMaxEdgeWeight.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,51\n" + + "2,23\n" + + "3,35\n" + + "4,45\n" + + "5,51\n"; + } + + @Test + public void testLowestWeightOutNeighborNoValue() throws Exception { + /* + * Get the lowest-weight out-neighbor + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = + graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.OUT); + verticesWithLowestOutNeighbor.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2\n" + + "2,3\n" + + "3,4\n" + + "4,5\n" + + "5,1\n"; + } + + @Test + public void testLowestWeightInNeighborNoValue() throws Exception { + /* + * Get the lowest-weight in-neighbor + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = + graph.reduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN); + verticesWithLowestOutNeighbor.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,5\n" + + "2,1\n" + + "3,1\n" + + "4,3\n" + + "5,3\n"; + } + + @Test + public void testMaxWeightAllNeighbors() throws Exception { + /* + * Get the maximum weight among all edges + * of a vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = + graph.reduceOnEdges(new SelectMaxWeightNeighborNoValue(), EdgeDirection.ALL); + verticesWithMaxEdgeWeight.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,51\n" + + "2,23\n" + + "3,35\n" + + "4,45\n" + + "5,51\n"; + } + + @SuppressWarnings("serial") + private static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateEdges( + Vertex<Long, Long> v, + Iterable<Edge<Long, Long>> edges) { - switch(progId) { - case 1: { - /* - * Get the lowest-weight out-neighbor - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>>() { - - public Tuple2<Long, Long> iterateEdges( - Vertex<Long, Long> v, - Iterable<Edge<Long, Long>> edges) { - - long weight = Long.MAX_VALUE; - long minNeighorId = 0; - - for (Edge<Long, Long> edge: edges) { - if (edge.getValue() < weight) { - weight = edge.getValue(); - minNeighorId = edge.getTarget(); - } - } - return new Tuple2<Long, Long>(v.getId(), minNeighorId); - } - }, EdgeDirection.OUT); - verticesWithLowestOutNeighbor.writeAsCsv(resultPath); - env.execute(); - return "1,2\n" + - "2,3\n" + - "3,4\n" + - "4,5\n" + - "5,1\n"; - } - case 2: { - /* - * Get the lowest-weight in-neighbor - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>>() { - - public Tuple2<Long, Long> iterateEdges( - Vertex<Long, Long> v, - Iterable<Edge<Long, Long>> edges) { - - long weight = Long.MAX_VALUE; - long minNeighorId = 0; - - for (Edge<Long, Long> edge: edges) { - if (edge.getValue() < weight) { - weight = edge.getValue(); - minNeighorId = edge.getSource(); - } - } - return new Tuple2<Long, Long>(v.getId(), minNeighorId); - } - }, EdgeDirection.IN); - verticesWithLowestOutNeighbor.writeAsCsv(resultPath); - env.execute(); - return "1,5\n" + - "2,1\n" + - "3,1\n" + - "4,3\n" + - "5,3\n"; + long weight = Long.MAX_VALUE; + long minNeighorId = 0; + + for (Edge<Long, Long> edge: edges) { + if (edge.getValue() < weight) { + weight = edge.getValue(); + minNeighorId = edge.getTarget(); + } } - case 3: { - /* - * Get the maximum weight among all edges - * of a vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = - graph.reduceOnEdges(new EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>>() { - - public Tuple2<Long, Long> iterateEdges(Vertex<Long, Long> v, - Iterable<Edge<Long, Long>> edges) { - - long weight = Long.MIN_VALUE; - - for (Edge<Long, Long> edge: edges) { - if (edge.getValue() > weight) { - weight = edge.getValue(); - } - } - return new Tuple2<Long, Long>(v.getId(), weight); - } - }, EdgeDirection.ALL); - verticesWithMaxEdgeWeight.writeAsCsv(resultPath); - env.execute(); - return "1,51\n" + - "2,23\n" + - "3,35\n" + - "4,45\n" + - "5,51\n"; + return new Tuple2<Long, Long>(v.getId(), minNeighorId); + } + } + + @SuppressWarnings("serial") + private static final class SelectMaxWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateEdges(Vertex<Long, Long> v, + Iterable<Edge<Long, Long>> edges) { + + long weight = Long.MIN_VALUE; + + for (Edge<Long, Long> edge: edges) { + if (edge.getValue() > weight) { + weight = edge.getValue(); + } } - case 4: { - /* - * Get the lowest-weight out-neighbor - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new EdgesFunction<Long, Long, Tuple2<Long, Long>>() { - - public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { - - long weight = Long.MAX_VALUE; - long minNeighorId = 0; - long vertexId = -1; - long i=0; - - for (Tuple2<Long, Edge<Long, Long>> edge: edges) { - if (edge.f1.getValue() < weight) { - weight = edge.f1.getValue(); - minNeighorId = edge.f1.getTarget(); - } - if (i==0) { - vertexId = edge.f0; - } i++; - } - return new Tuple2<Long, Long>(vertexId, minNeighorId); - } - }, EdgeDirection.OUT); - verticesWithLowestOutNeighbor.writeAsCsv(resultPath); - env.execute(); - return "1,2\n" + - "2,3\n" + - "3,4\n" + - "4,5\n" + - "5,1\n"; + return new Tuple2<Long, Long>(v.getId(), weight); + } + } + + @SuppressWarnings("serial") + private static final class SelectMinWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { + + long weight = Long.MAX_VALUE; + long minNeighorId = 0; + long vertexId = -1; + long i=0; + + for (Tuple2<Long, Edge<Long, Long>> edge: edges) { + if (edge.f1.getValue() < weight) { + weight = edge.f1.getValue(); + minNeighorId = edge.f1.getTarget(); + } + if (i==0) { + vertexId = edge.f0; + } i++; } - case 5: { - /* - * Get the lowest-weight in-neighbor - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new EdgesFunction<Long, Long, Tuple2<Long, Long>>() { - - public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { - - long weight = Long.MAX_VALUE; - long minNeighorId = 0; - long vertexId = -1; - long i=0; - - for (Tuple2<Long, Edge<Long, Long>> edge: edges) { - if (edge.f1.getValue() < weight) { - weight = edge.f1.getValue(); - minNeighorId = edge.f1.getSource(); - } - if (i==0) { - vertexId = edge.f0; - } i++; - } - return new Tuple2<Long, Long>(vertexId, minNeighorId); - } - }, EdgeDirection.IN); - verticesWithLowestOutNeighbor.writeAsCsv(resultPath); - env.execute(); - return "1,5\n" + - "2,1\n" + - "3,1\n" + - "4,3\n" + - "5,3\n"; + return new Tuple2<Long, Long>(vertexId, minNeighorId); + } + } + + @SuppressWarnings("serial") + private static final class SelectMaxWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { + + long weight = Long.MIN_VALUE; + long vertexId = -1; + long i=0; + + for (Tuple2<Long, Edge<Long, Long>> edge: edges) { + if (edge.f1.getValue() > weight) { + weight = edge.f1.getValue(); + } + if (i==0) { + vertexId = edge.f0; + } i++; } - case 6: { - /* - * Get the maximum weight among all edges - * of a vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = - graph.reduceOnEdges(new EdgesFunction<Long, Long, Tuple2<Long, Long>>() { - - public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { - - long weight = Long.MIN_VALUE; - long vertexId = -1; - long i=0; - - for (Tuple2<Long, Edge<Long, Long>> edge: edges) { - if (edge.f1.getValue() > weight) { - weight = edge.f1.getValue(); - } - if (i==0) { - vertexId = edge.f0; - } i++; - } - return new Tuple2<Long, Long>(vertexId, weight); - } - }, EdgeDirection.ALL); - verticesWithMaxEdgeWeight.writeAsCsv(resultPath); - env.execute(); - return "1,51\n" + - "2,23\n" + - "3,35\n" + - "4,45\n" + - "5,51\n"; + return new Tuple2<Long, Long>(vertexId, weight); + } + } + + @SuppressWarnings("serial") + private static final class SelectMinWeightInNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateEdges( + Vertex<Long, Long> v, + Iterable<Edge<Long, Long>> edges) { + + long weight = Long.MAX_VALUE; + long minNeighorId = 0; + + for (Edge<Long, Long> edge: edges) { + if (edge.getValue() < weight) { + weight = edge.getValue(); + minNeighorId = edge.getSource(); + } } - default: - throw new IllegalArgumentException("Invalid program id"); + return new Tuple2<Long, Long>(v.getId(), minNeighorId); + } + } + + @SuppressWarnings("serial") + private static final class SelectMinWeightInNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { + + long weight = Long.MAX_VALUE; + long minNeighorId = 0; + long vertexId = -1; + long i=0; + + for (Tuple2<Long, Edge<Long, Long>> edge: edges) { + if (edge.f1.getValue() < weight) { + weight = edge.f1.getValue(); + minNeighorId = edge.f1.getSource(); + } + if (i==0) { + vertexId = edge.f0; + } i++; } + return new Tuple2<Long, Long>(vertexId, minNeighorId); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e6b9cecd/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java index 2624960..e64eacf 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java @@ -18,273 +18,286 @@ package org.apache.flink.graph.test; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Collection; import java.util.Iterator; -import java.util.LinkedList; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.Edge; import org.apache.flink.graph.EdgeDirection; import org.apache.flink.graph.Graph; import org.apache.flink.graph.NeighborsFunction; import org.apache.flink.graph.NeighborsFunctionWithVertexValue; import org.apache.flink.graph.Vertex; -import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) -public class TestReduceOnNeighborMethods extends JavaProgramTestBase { +public class TestReduceOnNeighborMethods extends MultipleProgramsTestBase { - private static int NUM_PROGRAMS = 6; - - private int curProgId = config.getInteger("ProgramId", -1); - private String resultPath; - private String expectedResult; - - public TestReduceOnNeighborMethods(Configuration config) { - super(config); - } - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); + public TestReduceOnNeighborMethods(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); } - @Override - protected void testProgram() throws Exception { - expectedResult = GraphProgs.runProgram(curProgId, resultPath); + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); } - - @Override - protected void postSubmit() throws Exception { + + @After + public void after() throws Exception{ compareResultsByLinesInMemory(expectedResult, resultPath); } - - @Parameters - public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { - LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + @Test + public void testSumOfOutNeighbors() throws Exception { + /* + * Get the sum of out-neighbor values + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); - for(int i=1; i <= NUM_PROGRAMS; i++) { - Configuration config = new Configuration(); - config.setInteger("ProgramId", i); - tConfigs.add(config); - } - - return toParameterList(tConfigs); + DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = + graph.reduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT); + + verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,5\n" + + "2,3\n" + + "3,9\n" + + "4,5\n" + + "5,1\n"; + } + + @Test + public void testSumOfInNeighbors() throws Exception { + /* + * Get the sum of in-neighbor values + * times the edge weights for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithSum = + graph.reduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN); + + verticesWithSum.writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,255\n" + + "2,12\n" + + "3,59\n" + + "4,102\n" + + "5,285\n"; } + + @Test + public void testSumOfOAllNeighbors() throws Exception { + /* + * Get the sum of all neighbor values + * including own vertex value + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = + graph.reduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL); + + verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,11\n" + + "2,6\n" + + "3,15\n" + + "4,12\n" + + "5,13\n"; + } + + @Test + public void testSumOfOutNeighborsNoValue() throws Exception { + /* + * Get the sum of out-neighbor values + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = + graph.reduceOnNeighbors(new SumOutNeighborsNoValue(), EdgeDirection.OUT); + + verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,5\n" + + "2,3\n" + + "3,9\n" + + "4,5\n" + + "5,1\n"; + } + + @Test + public void testSumOfInNeighborsNoValue() throws Exception { + /* + * Get the sum of in-neighbor values + * times the edge weights for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithSum = + graph.reduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN); + + verticesWithSum.writeAsCsv(resultPath); + env.execute(); - private static class GraphProgs { + expectedResult = "1,255\n" + + "2,12\n" + + "3,59\n" + + "4,102\n" + + "5,285\n"; + } + + @Test + public void testSumOfAllNeighborsNoValue() throws Exception { + /* + * Get the sum of all neighbor values + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = + graph.reduceOnNeighbors(new SumAllNeighborsNoValue(), EdgeDirection.ALL); + + verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); + env.execute(); - @SuppressWarnings("serial") - public static String runProgram(int progId, String resultPath) throws Exception { + expectedResult = "1,10\n" + + "2,4\n" + + "3,12\n" + + "4,8\n" + + "5,8\n"; + } + + @SuppressWarnings("serial") + private static final class SumOutNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, + Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex, + Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { - switch(progId) { - case 1: { - /* - * Get the sum of out-neighbor values - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new NeighborsFunctionWithVertexValue<Long, Long, Long, - Tuple2<Long, Long>>() { - public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex, - Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { - long sum = 0; - for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { - sum += neighbor.f1.getValue(); - } - return new Tuple2<Long, Long>(vertex.getId(), sum); - } - }, EdgeDirection.OUT); - - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); - env.execute(); - return "1,5\n" + - "2,3\n" + - "3,9\n" + - "4,5\n" + - "5,1\n"; - } - case 2: { - /* - * Get the sum of in-neighbor values - * times the edge weights for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithSum = - graph.reduceOnNeighbors(new NeighborsFunctionWithVertexValue<Long, Long, Long, - Tuple2<Long, Long>>() { - public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex, - Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { - long sum = 0; - for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { - sum += neighbor.f0.getValue() * neighbor.f1.getValue(); - } - return new Tuple2<Long, Long>(vertex.getId(), sum); - } - }, EdgeDirection.IN); - - verticesWithSum.writeAsCsv(resultPath); - env.execute(); - return "1,255\n" + - "2,12\n" + - "3,59\n" + - "4,102\n" + - "5,285\n"; + long sum = 0; + for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { + sum += neighbor.f1.getValue(); } - case 3: { - /* - * Get the sum of all neighbor values - * including own vertex value - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new NeighborsFunctionWithVertexValue<Long, Long, Long, - Tuple2<Long, Long>>() { - public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex, - Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { - long sum = 0; - for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { - sum += neighbor.f1.getValue(); - } - return new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()); - } - }, EdgeDirection.ALL); - - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); - env.execute(); - return "1,11\n" + - "2,6\n" + - "3,15\n" + - "4,12\n" + - "5,13\n"; + return new Tuple2<Long, Long>(vertex.getId(), sum); + } + } + + @SuppressWarnings("serial") + private static final class SumInNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, + Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex, + Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { + + long sum = 0; + for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { + sum += neighbor.f0.getValue() * neighbor.f1.getValue(); } - case 4: { - /* - * Get the sum of out-neighbor values - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new NeighborsFunction<Long, Long, Long, - Tuple2<Long, Long>>() { - public Tuple2<Long, Long> iterateNeighbors( - Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { - long sum = 0; - Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null; - Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator = - neighbors.iterator(); - while(neighborsIterator.hasNext()) { - next = neighborsIterator.next(); - sum += next.f2.getValue(); - } - return new Tuple2<Long, Long>(next.f0, sum); - } - }, EdgeDirection.OUT); - - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); - env.execute(); - return "1,5\n" + - "2,3\n" + - "3,9\n" + - "4,5\n" + - "5,1\n"; + return new Tuple2<Long, Long>(vertex.getId(), sum); + } + } + + @SuppressWarnings("serial") + private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, + Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex, + Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { + + long sum = 0; + for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { + sum += neighbor.f1.getValue(); } - case 5: { - /* - * Get the sum of in-neighbor values - * times the edge weights for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithSum = - graph.reduceOnNeighbors(new NeighborsFunction<Long, Long, Long, - Tuple2<Long, Long>>() { - public Tuple2<Long, Long> iterateNeighbors( - Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { - long sum = 0; - Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null; - Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator = - neighbors.iterator(); - while(neighborsIterator.hasNext()) { - next = neighborsIterator.next(); - sum += next.f2.getValue() * next.f1.getValue(); - } - return new Tuple2<Long, Long>(next.f0, sum); - } - }, EdgeDirection.IN); - - - verticesWithSum.writeAsCsv(resultPath); - env.execute(); - return "1,255\n" + - "2,12\n" + - "3,59\n" + - "4,102\n" + - "5,285\n"; + return new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()); + } + } + + @SuppressWarnings("serial") + private static final class SumOutNeighborsNoValue implements NeighborsFunction<Long, Long, Long, + Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateNeighbors( + Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { + + long sum = 0; + Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null; + Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator = + neighbors.iterator(); + while(neighborsIterator.hasNext()) { + next = neighborsIterator.next(); + sum += next.f2.getValue(); } - case 6: { - /* - * Get the sum of all neighbor values - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new NeighborsFunction<Long, Long, Long, - Tuple2<Long, Long>>() { - public Tuple2<Long, Long> iterateNeighbors( - Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { - long sum = 0; - Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null; - Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator = - neighbors.iterator(); - while(neighborsIterator.hasNext()) { - next = neighborsIterator.next(); - sum += next.f2.getValue(); - } - return new Tuple2<Long, Long>(next.f0, sum); - } - }, EdgeDirection.ALL); - - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); - env.execute(); - return "1,10\n" + - "2,4\n" + - "3,12\n" + - "4,8\n" + - "5,8\n"; + return new Tuple2<Long, Long>(next.f0, sum); + } + } + + @SuppressWarnings("serial") + private static final class SumInNeighborsNoValue implements NeighborsFunction<Long, Long, Long, + Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateNeighbors( + Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { + + long sum = 0; + Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null; + Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator = + neighbors.iterator(); + while(neighborsIterator.hasNext()) { + next = neighborsIterator.next(); + sum += next.f2.getValue() * next.f1.getValue(); } - default: - throw new IllegalArgumentException("Invalid program id"); + return new Tuple2<Long, Long>(next.f0, sum); + } + } + + @SuppressWarnings("serial") + private static final class SumAllNeighborsNoValue implements NeighborsFunction<Long, Long, Long, + Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateNeighbors( + Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { + + long sum = 0; + Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null; + Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator = + neighbors.iterator(); + while(neighborsIterator.hasNext()) { + next = neighborsIterator.next(); + sum += next.f2.getValue(); } + return new Tuple2<Long, Long>(next.f0, sum); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e6b9cecd/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestWeaklyConnected.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestWeaklyConnected.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestWeaklyConnected.java index 7f1049c..f5b6d9d 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestWeaklyConnected.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestWeaklyConnected.java @@ -18,126 +18,101 @@ package org.apache.flink.graph.test; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Collection; -import java.util.LinkedList; - import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.Graph; -import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) -public class TestWeaklyConnected extends JavaProgramTestBase { +public class TestWeaklyConnected extends MultipleProgramsTestBase { - private static int NUM_PROGRAMS = 4; - - private int curProgId = config.getInteger("ProgramId", -1); - private String resultPath; - private String expectedResult; - - public TestWeaklyConnected(Configuration config) { - super(config); - } - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); + public TestWeaklyConnected(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); } - @Override - protected void testProgram() throws Exception { - expectedResult = GraphProgs.runProgram(curProgId, resultPath); + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); } - - @Override - protected void postSubmit() throws Exception { + + @After + public void after() throws Exception{ compareResultsByLinesInMemory(expectedResult, resultPath); } - - @Parameters - public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { - LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + @Test + public void testWithConnectedDirected() throws Exception { + /* + * Test isWeaklyConnected() with a connected, directed graph + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + graph.isWeaklyConnected(10).writeAsText(resultPath); + + env.execute(); + expectedResult = "true\n"; + } - for(int i=1; i <= NUM_PROGRAMS; i++) { - Configuration config = new Configuration(); - config.setInteger("ProgramId", i); - tConfigs.add(config); - } + @Test + public void testWithDisconnectedDirected() throws Exception { + /* + * Test isWeaklyConnected() with a disconnected, directed graph + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getDisconnectedLongLongEdgeData(env), env); - return toParameterList(tConfigs); + graph.isWeaklyConnected(10).writeAsText(resultPath); + + env.execute(); + expectedResult = "false\n"; } - - private static class GraphProgs { + + @Test + public void testWithConnectedUndirected() throws Exception { + /* + * Test isWeaklyConnected() with a connected, undirected graph + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env).getUndirected(); + + graph.isWeaklyConnected(10).writeAsText(resultPath); + + env.execute(); + expectedResult = "true\n"; + } + + @Test + public void testWithDisconnectedUndirected() throws Exception { + /* + * Test isWeaklyConnected() with a disconnected, undirected graph + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getDisconnectedLongLongEdgeData(env), env).getUndirected(); + + graph.isWeaklyConnected(10).writeAsText(resultPath); - public static String runProgram(int progId, String resultPath) throws Exception { - - switch(progId) { - case 1: { - /* - * Test isWeaklyConnected() with a connected, directed graph - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - graph.isWeaklyConnected(10).writeAsText(resultPath); - - env.execute(); - return "true\n"; - } - case 2: { - /* - * Test isWeaklyConnected() with a disconnected, directed graph - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getDisconnectedLongLongEdgeData(env), env); - - graph.isWeaklyConnected(10).writeAsText(resultPath); - - env.execute(); - return "false\n"; - } - case 3: { - /* - * Test isWeaklyConnected() with a connected, undirected graph - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env).getUndirected(); - - graph.isWeaklyConnected(10).writeAsText(resultPath); - - env.execute(); - return "true\n"; - } - case 4: { - /* - * Test isWeaklyConnected() with a disconnected, undirected graph - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getDisconnectedLongLongEdgeData(env), env).getUndirected(); - - graph.isWeaklyConnected(10).writeAsText(resultPath); - - env.execute(); - return "false\n"; - } - default: - throw new IllegalArgumentException("Invalid program id"); - } - } + env.execute(); + expectedResult = "false\n"; } - }