[FLINK-1201] [gelly] Renamed tests from TestXyz to XyzITCase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6296c394 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6296c394 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6296c394 Branch: refs/heads/master Commit: 6296c394fb04d7e01f8a03a348b7a179c209256b Parents: e6b9cec Author: vasia <vasilikikala...@gmail.com> Authored: Mon Jan 26 18:58:33 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 10:46:16 2015 +0100 ---------------------------------------------------------------------- .../apache/flink/graph/test/DegreesITCase.java | 171 ++++++ .../flink/graph/test/FromCollectionITCase.java | 120 +++++ .../flink/graph/test/GraphCreationITCase.java | 170 ++++++ .../test/GraphCreationWithMapperITCase.java | 158 ++++++ .../flink/graph/test/GraphMutationsITCase.java | 273 ++++++++++ .../flink/graph/test/GraphOperationsITCase.java | 267 ++++++++++ .../flink/graph/test/JoinWithEdgesITCase.java | 519 +++++++++++++++++++ .../graph/test/JoinWithVerticesITCase.java | 218 ++++++++ .../apache/flink/graph/test/MapEdgesITCase.java | 223 ++++++++ .../flink/graph/test/MapVerticesITCase.java | 233 +++++++++ .../graph/test/ReduceOnEdgesMethodsITCase.java | 317 +++++++++++ .../test/ReduceOnNeighborMethodsITCase.java | 303 +++++++++++ .../apache/flink/graph/test/TestDegrees.java | 171 ------ .../flink/graph/test/TestFromCollection.java | 120 ----- .../flink/graph/test/TestGraphCreation.java | 170 ------ .../graph/test/TestGraphCreationWithMapper.java | 158 ------ .../flink/graph/test/TestGraphMutations.java | 273 ---------- .../flink/graph/test/TestGraphOperations.java | 267 ---------- .../flink/graph/test/TestJoinWithEdges.java | 519 ------------------- .../flink/graph/test/TestJoinWithVertices.java | 218 -------- .../apache/flink/graph/test/TestMapEdges.java | 223 -------- .../flink/graph/test/TestMapVertices.java | 233 --------- .../graph/test/TestReduceOnEdgesMethods.java | 317 ----------- .../graph/test/TestReduceOnNeighborMethods.java | 303 ----------- .../TestVertexCentricConnectedComponents.java | 118 ----- .../flink/graph/test/TestWeaklyConnected.java | 118 ----- .../VertexCentricConnectedComponentsITCase.java | 118 +++++ .../flink/graph/test/WeaklyConnectedITCase.java | 118 +++++ 28 files changed, 3208 insertions(+), 3208 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/DegreesITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/DegreesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/DegreesITCase.java new file mode 100644 index 0000000..96a6d20 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/DegreesITCase.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.NullValue; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class DegreesITCase extends MultipleProgramsTestBase { + + public DegreesITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testOutDegrees() throws Exception { + /* + * Test outDegrees() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + graph.outDegrees().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2\n" + + "2,1\n" + + "3,2\n" + + "4,1\n" + + "5,1\n"; + } + + @Test + public void testOutDegreesWithNoOutEdges() throws Exception { + /* + * Test outDegrees() no outgoing edges + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env); + + graph.outDegrees().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,3\n" + + "2,1\n" + + "3,1\n" + + "4,1\n" + + "5,0\n"; + } + + @Test + public void testInDegrees() throws Exception { + /* + * Test inDegrees() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + graph.inDegrees().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,1\n" + + "2,1\n" + + "3,2\n" + + "4,1\n" + + "5,2\n"; + } + + @Test + public void testInDegreesWithNoInEdge() throws Exception { + /* + * Test inDegrees() no ingoing edge + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env); + + graph.inDegrees().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,0\n" + + "2,1\n" + + "3,1\n" + + "4,1\n" + + "5,3\n"; + } + + @Test + public void testGetDegrees() throws Exception { + /* + * Test getDegrees() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + graph.getDegrees().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,3\n" + + "2,2\n" + + "3,4\n" + + "4,2\n" + + "5,3\n"; + } + + @Test + public void testGetDegreesWithDisconnectedData() throws Exception { + /* + * Test getDegrees() with disconnected data + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, NullValue, Long> graph = + Graph.fromDataSet(TestGraphUtils.getDisconnectedLongLongEdgeData(env), env); + + graph.outDegrees().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2\n" + + "2,1\n" + + "3,0\n" + + "4,1\n" + + "5,0\n"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/FromCollectionITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/FromCollectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/FromCollectionITCase.java new file mode 100644 index 0000000..5259143 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/FromCollectionITCase.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.NullValue; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class FromCollectionITCase extends MultipleProgramsTestBase { + + public FromCollectionITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testFromCollectionVerticesEdges() throws Exception { + /* + * Test fromCollection(vertices, edges): + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), + TestGraphUtils.getLongLongEdges(), env); + + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testFromCollectionEdgesNoInitialValue() throws Exception { + /* + * Test fromCollection(edges) with no initial value for the vertices + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, NullValue, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(), + env); + + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,(null)\n" + + "2,(null)\n" + + "3,(null)\n" + + "4,(null)\n" + + "5,(null)\n"; + } + + @Test + public void testFromCollectionEdgesWithInitialValue() throws Exception { + /* + * Test fromCollection(edges) with vertices initialised by a + * function that takes the id and doubles it + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(), + new InitVerticesMapper(), env); + + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2\n" + + "2,4\n" + + "3,6\n" + + "4,8\n" + + "5,10\n"; + } + + @SuppressWarnings("serial") + private static final class InitVerticesMapper implements MapFunction<Long, Long> { + public Long map(Long vertexId) { + return vertexId * 2; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationITCase.java new file mode 100644 index 0000000..4cbdd90 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationITCase.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; +import org.apache.flink.graph.validation.InvalidVertexIdsValidator; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.NullValue; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class GraphCreationITCase extends MultipleProgramsTestBase { + + public GraphCreationITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testCreateWithoutVertexValues() throws Exception { + /* + * Test create() with edge dataset and no vertex values + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, NullValue, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), env); + + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,(null)\n" + + "2,(null)\n" + + "3,(null)\n" + + "4,(null)\n" + + "5,(null)\n"; + } + + @Test + public void testCreateWithMapper() throws Exception { + /* + * Test create() with edge dataset and a mapper that assigns the id as value + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), + new AssignIdAsValueMapper(), env); + + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,1\n" + + "2,2\n" + + "3,3\n" + + "4,4\n" + + "5,5\n"; + } + + @Test + public void testCreateWithCustomVertexValue() throws Exception { + /* + * Test create() with edge dataset and a mapper that assigns a parametrized custom vertex value + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, DummyCustomParameterizedType<Double>, Long> graph = Graph.fromDataSet( + TestGraphUtils.getLongLongEdgeData(env), new AssignCustomVertexValueMapper(), env); + + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,(2.0,0)\n" + + "2,(4.0,1)\n" + + "3,(6.0,2)\n" + + "4,(8.0,3)\n" + + "5,(10.0,4)\n"; + } + + @Test + public void testValidate() throws Exception { + /* + * Test validate(): + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongVertexData(env); + DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env); + DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>()); + + result.writeAsText(resultPath); + env.execute(); + + expectedResult = "true\n"; + } + + @Test + public void testValidateWithInvalidIds() throws Exception { + /* + * Test validate() - invalid vertex ids + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongInvalidVertexData(env); + DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env); + DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>()); + result.writeAsText(resultPath); + env.execute(); + + expectedResult = "false\n"; + } + + @SuppressWarnings("serial") + private static final class AssignIdAsValueMapper implements MapFunction<Long, Long> { + public Long map(Long vertexId) { + return vertexId; + } + } + + @SuppressWarnings("serial") + private static final class AssignCustomVertexValueMapper implements + MapFunction<Long, DummyCustomParameterizedType<Double>> { + + DummyCustomParameterizedType<Double> dummyValue = + new DummyCustomParameterizedType<Double>(); + + public DummyCustomParameterizedType<Double> map(Long vertexId) { + dummyValue.setIntField(vertexId.intValue()-1); + dummyValue.setTField(vertexId*2.0); + return dummyValue; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationWithMapperITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationWithMapperITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationWithMapperITCase.java new file mode 100644 index 0000000..24f7c82 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationWithMapperITCase.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class GraphCreationWithMapperITCase extends MultipleProgramsTestBase { + + public GraphCreationWithMapperITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testWithDoubleValueMapper() throws Exception { + /* + * Test create() with edge dataset and a mapper that assigns a double constant as value + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Double, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), + new AssignDoubleValueMapper(), env); + + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,0.1\n" + + "2,0.1\n" + + "3,0.1\n" + + "4,0.1\n" + + "5,0.1\n"; + } + + @Test + public void testWithTuple2ValueMapper() throws Exception { + /* + * Test create() with edge dataset and a mapper that assigns a Tuple2 as value + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Tuple2<Long, Long>, Long> graph = Graph.fromDataSet( + TestGraphUtils.getLongLongEdgeData(env), new AssignTuple2ValueMapper(), env); + + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,(2,42)\n" + + "2,(4,42)\n" + + "3,(6,42)\n" + + "4,(8,42)\n" + + "5,(10,42)\n"; + } + + @Test + public void testWithConstantValueMapper() throws Exception { + /* + * Test create() with edge dataset with String key type + * and a mapper that assigns a double constant as value + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<String, Double, Long> graph = Graph.fromDataSet(TestGraphUtils.getStringLongEdgeData(env), + new AssignDoubleConstantMapper(), env); + + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,0.1\n" + + "2,0.1\n" + + "3,0.1\n" + + "4,0.1\n" + + "5,0.1\n"; + } + + @Test + public void testWithDCustomValueMapper() throws Exception { + /* + * Test create() with edge dataset and a mapper that assigns a custom vertex value + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, DummyCustomType, Long> graph = Graph.fromDataSet( + TestGraphUtils.getLongLongEdgeData(env), new AssignCustomValueMapper(), env); + + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,(F,0)\n" + + "2,(F,1)\n" + + "3,(F,2)\n" + + "4,(F,3)\n" + + "5,(F,4)\n"; + } + + @SuppressWarnings("serial") + private static final class AssignDoubleValueMapper implements MapFunction<Long, Double> { + public Double map(Long value) { + return 0.1d; + } + } + + @SuppressWarnings("serial") + private static final class AssignTuple2ValueMapper implements MapFunction<Long, Tuple2<Long, Long>> { + public Tuple2<Long, Long> map(Long vertexId) { + return new Tuple2<Long, Long>(vertexId*2, 42l); + } + } + + @SuppressWarnings("serial") + private static final class AssignDoubleConstantMapper implements MapFunction<String, Double> { + public Double map(String value) { + return 0.1d; + } + } + + @SuppressWarnings("serial") + private static final class AssignCustomValueMapper implements MapFunction<Long, DummyCustomType> { + public DummyCustomType map(Long vertexId) { + return new DummyCustomType(vertexId.intValue()-1, false); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphMutationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphMutationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphMutationsITCase.java new file mode 100644 index 0000000..3af8943 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphMutationsITCase.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class GraphMutationsITCase extends MultipleProgramsTestBase { + + public GraphMutationsITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testAddVertex() throws Exception { + /* + * Test addVertex() -- simple case + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>(); + edges.add(new Edge<Long, Long>(6L, 1L, 61L)); + graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L), edges); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n" + + "6,1,61\n"; + } + + @Test + public void testAddVertexExisting() throws Exception { + /* + * Test addVertex() -- add an existing vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>(); + edges.add(new Edge<Long, Long>(1L, 5L, 15L)); + graph = graph.addVertex(new Vertex<Long, Long>(1L, 1L), edges); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "1,5,15\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testAddVertexNoEdges() throws Exception { + /* + * Test addVertex() -- add vertex with empty edge set + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>(); + graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L), edges); + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,1\n" + + "2,2\n" + + "3,3\n" + + "4,4\n" + + "5,5\n" + + "6,6\n"; + } + + @Test + public void testRemoveVertex() throws Exception { + /* + * Test removeVertex() -- simple case + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph = graph.removeVertex(new Vertex<Long, Long>(5L, 5L)); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n"; + } + + @Test + public void testRemoveInvalidVertex() throws Exception { + /* + * Test removeVertex() -- remove an invalid vertex + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph = graph.removeVertex(new Vertex<Long, Long>(6L, 6L)); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testAddEdge() throws Exception { + /* + * Test addEdge() -- simple case + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph = graph.addEdge(new Vertex<Long, Long>(6L, 6L), new Vertex<Long, Long>(1L, 1L), + 61L); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n" + + "6,1,61\n"; + } + + @Test + public void testAddExistingEdge() throws Exception { + /* + * Test addEdge() -- add already existing edge + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph = graph.addEdge(new Vertex<Long, Long>(1L, 1L), new Vertex<Long, Long>(2L, 2L), + 12L); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testRemoveVEdge() throws Exception { + /* + * Test removeEdge() -- simple case + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph = graph.removeEdge(new Edge<Long, Long>(5L, 1L, 51L)); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n"; + } + + @Test + public void testRemoveInvalidEdge() throws Exception { + /* + * Test removeEdge() -- invalid edge + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph = graph.removeEdge(new Edge<Long, Long>(6L, 1L, 61L)); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphOperationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphOperationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphOperationsITCase.java new file mode 100644 index 0000000..f194a60 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphOperationsITCase.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class GraphOperationsITCase extends MultipleProgramsTestBase { + + public GraphOperationsITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testUndirected() throws Exception { + /* + * Test getUndirected() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + graph.getUndirected().getEdges().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2,12\n" + "2,1,12\n" + + "1,3,13\n" + "3,1,13\n" + + "2,3,23\n" + "3,2,23\n" + + "3,4,34\n" + "4,3,34\n" + + "3,5,35\n" + "5,3,35\n" + + "4,5,45\n" + "5,4,45\n" + + "5,1,51\n" + "1,5,51\n"; + } + + @Test + public void testReverse() throws Exception { + /* + * Test reverse() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + graph.reverse().getEdges().writeAsCsv(resultPath); + env.execute(); + expectedResult = "2,1,12\n" + + "3,1,13\n" + + "3,2,23\n" + + "4,3,34\n" + + "5,3,35\n" + + "5,4,45\n" + + "1,5,51\n"; + } + + @SuppressWarnings("serial") + @Test + public void testSubGraph() throws Exception { + /* + * Test subgraph: + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph.subgraph(new FilterFunction<Vertex<Long, Long>>() { + public boolean filter(Vertex<Long, Long> vertex) throws Exception { + return (vertex.getValue() > 2); + } + }, + new FilterFunction<Edge<Long, Long>>() { + public boolean filter(Edge<Long, Long> edge) throws Exception { + return (edge.getValue() > 34); + } + }).getEdges().writeAsCsv(resultPath); + + env.execute(); + expectedResult = "3,5,35\n" + + "4,5,45\n"; + } + + @SuppressWarnings("serial") + @Test + public void testFilterVertices() throws Exception { + /* + * Test filterOnVertices: + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph.filterOnVertices(new FilterFunction<Vertex<Long, Long>>() { + public boolean filter(Vertex<Long, Long> vertex) throws Exception { + return (vertex.getValue() > 2); + } + }).getEdges().writeAsCsv(resultPath); + + env.execute(); + expectedResult = "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n"; + } + + @SuppressWarnings("serial") + @Test + public void testFilterEdges() throws Exception { + /* + * Test filterOnEdges: + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph.filterOnEdges(new FilterFunction<Edge<Long, Long>>() { + public boolean filter(Edge<Long, Long> edge) throws Exception { + return (edge.getValue() > 34); + } + }).getEdges().writeAsCsv(resultPath); + + env.execute(); + expectedResult = "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testNumberOfVertices() throws Exception { + /* + * Test numberOfVertices() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph.numberOfVertices().writeAsText(resultPath); + + env.execute(); + expectedResult = "5"; + } + + @Test + public void testNumberOfEdges() throws Exception { + /* + * Test numberOfEdges() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph.numberOfEdges().writeAsText(resultPath); + + env.execute(); + expectedResult = "7"; + } + + @Test + public void testVertexIds() throws Exception { + /* + * Test getVertexIds() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph.getVertexIds().writeAsText(resultPath); + + env.execute(); + expectedResult = "1\n2\n3\n4\n5\n"; + } + + @Test + public void testEdgesIds() throws Exception { + /* + * Test getEdgeIds() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph.getEdgeIds().writeAsCsv(resultPath); + + env.execute(); + expectedResult = "1,2\n" + "1,3\n" + + "2,3\n" + "3,4\n" + + "3,5\n" + "4,5\n" + + "5,1\n"; + } + + @Test + public void testUnion() throws Exception { + /* + * Test union() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>(); + List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>(); + + vertices.add(new Vertex<Long, Long>(6L, 6L)); + edges.add(new Edge<Long, Long>(6L, 1L, 61L)); + + graph = graph.union(Graph.fromCollection(vertices, edges, env)); + + graph.getEdges().writeAsCsv(resultPath); + + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n" + + "6,1,61\n"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithEdgesITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithEdgesITCase.java new file mode 100644 index 0000000..6f4f6a8 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithEdgesITCase.java @@ -0,0 +1,519 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; +import org.apache.flink.graph.utils.EdgeToTuple3Map; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class JoinWithEdgesITCase extends MultipleProgramsTestBase { + + public JoinWithEdgesITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testWithEdgesInputDataset() throws Exception { + /* + * Test joinWithEdges with the input DataSet parameter identical + * to the edge DataSet + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges() + .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,68\n" + + "3,5,70\n" + + "4,5,90\n" + + "5,1,102\n"; + } + + @Test + public void testWithLessElements() throws Exception { + /* + * Test joinWithEdges with the input DataSet passed as a parameter containing + * less elements than the edge DataSet, but of the same type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3) + .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testWithLessElementsDifferentType() throws Exception { + /* + * Test joinWithEdges with the input DataSet passed as a parameter containing + * less elements than the edge DataSet and of a different type(Boolean) + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3) + .map(new BooleanEdgeValueMapper()), new DoubleIfTrueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testWithNoCommonKeys() throws Exception { + /* + * Test joinWithEdges with the input DataSet containing different keys than the edge DataSet + * - the iterator becomes empty. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env), + new DoubleValueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,68\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testWithCustomType() throws Exception { + /* + * Test joinWithEdges with a DataSet containing custom parametrised type input values + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env), + new CustomValueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,10\n" + + "1,3,20\n" + + "2,3,30\n" + + "3,4,40\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testWithEdgesOnSource() throws Exception { + /* + * Test joinWithEdgesOnSource with the input DataSet parameter identical + * to the edge DataSet + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges() + .map(new ProjectSourceAndValueMapper()), new AddValuesMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,25\n" + + "2,3,46\n" + + "3,4,68\n" + + "3,5,69\n" + + "4,5,90\n" + + "5,1,102\n"; + } + + @Test + public void testOnSourceWithLessElements() throws Exception { + /* + * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing + * less elements than the edge DataSet, but of the same type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3) + .map(new ProjectSourceAndValueMapper()), new AddValuesMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,25\n" + + "2,3,46\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testOnSourceWithDifferentType() throws Exception { + /* + * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing + * less elements than the edge DataSet and of a different type(Boolean) + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3) + .map(new ProjectSourceWithTrueMapper()), new DoubleIfTrueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testOnSourceWithNoCommonKeys() throws Exception { + /* + * Test joinWithEdgesOnSource with the input DataSet containing different keys than the edge DataSet + * - the iterator becomes empty. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env), + new DoubleValueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,20\n" + + "1,3,20\n" + + "2,3,60\n" + + "3,4,80\n" + + "3,5,80\n" + + "4,5,120\n" + + "5,1,51\n"; + } + + @Test + public void testOnSourceWithCustom() throws Exception { + /* + * Test joinWithEdgesOnSource with a DataSet containing custom parametrised type input values + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env), + new CustomValueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,10\n" + + "1,3,10\n" + + "2,3,30\n" + + "3,4,40\n" + + "3,5,40\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testWithEdgesOnTarget() throws Exception { + /* + * Test joinWithEdgesOnTarget with the input DataSet parameter identical + * to the edge DataSet + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges() + .map(new ProjectTargetAndValueMapper()), new AddValuesMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,36\n" + + "3,4,68\n" + + "3,5,70\n" + + "4,5,80\n" + + "5,1,102\n"; + } + + @Test + public void testWithOnTargetWithLessElements() throws Exception { + /* + * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing + * less elements than the edge DataSet, but of the same type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3) + .map(new ProjectTargetAndValueMapper()), new AddValuesMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,36\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testOnTargetWithDifferentType() throws Exception { + /* + * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing + * less elements than the edge DataSet and of a different type(Boolean) + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3) + .map(new ProjectTargetWithTrueMapper()), new DoubleIfTrueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,24\n" + + "1,3,26\n" + + "2,3,46\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testOnTargetWithNoCommonKeys() throws Exception { + /* + * Test joinWithEdgesOnTarget with the input DataSet containing different keys than the edge DataSet + * - the iterator becomes empty. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env), + new DoubleValueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,20\n" + + "1,3,40\n" + + "2,3,40\n" + + "3,4,80\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,140\n"; + } + + @Test + public void testOnTargetWithCustom() throws Exception { + /* + * Test joinWithEdgesOnTarget with a DataSet containing custom parametrised type input values + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env), + new CustomValueMapper()); + + result.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,10\n" + + "1,3,20\n" + + "2,3,20\n" + + "3,4,40\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @SuppressWarnings("serial") + private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> { + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f0 + tuple.f1; + } + } + + @SuppressWarnings("serial") + private static final class BooleanEdgeValueMapper implements MapFunction<Edge<Long, Long>, Tuple3<Long, Long, Boolean>> { + public Tuple3<Long, Long, Boolean> map(Edge<Long, Long> edge) throws Exception { + return new Tuple3<Long, Long, Boolean>(edge.getSource(), + edge.getTarget(), true); + } + } + + @SuppressWarnings("serial") + private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> { + public Long map(Tuple2<Long, Boolean> tuple) throws Exception { + if(tuple.f1) { + return tuple.f0 * 2; + } + else { + return tuple.f0; + } + } + } + + @SuppressWarnings("serial") + private static final class DoubleValueMapper implements MapFunction<Tuple2<Long, Long>, Long> { + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f1 * 2; + } + } + + @SuppressWarnings("serial") + private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> { + public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception { + return (long) tuple.f1.getIntField(); + } + } + + @SuppressWarnings("serial") + private static final class ProjectSourceAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> { + public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception { + return new Tuple2<Long, Long>(edge.getSource(), edge.getValue()); + } + } + + @SuppressWarnings("serial") + private static final class ProjectSourceWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> { + public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception { + return new Tuple2<Long, Boolean>(edge.getSource(), true); + } + } + + @SuppressWarnings("serial") + private static final class ProjectTargetAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> { + public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception { + return new Tuple2<Long, Long>(edge.getTarget(), edge.getValue()); + } + } + + @SuppressWarnings("serial") + private static final class ProjectTargetWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> { + public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception { + return new Tuple2<Long, Boolean>(edge.getTarget(), true); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithVerticesITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithVerticesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithVerticesITCase.java new file mode 100644 index 0000000..0574265 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithVerticesITCase.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; +import org.apache.flink.graph.utils.VertexToTuple2Map; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class JoinWithVerticesITCase extends MultipleProgramsTestBase { + + public JoinWithVerticesITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testJoinWithVertexSet() throws Exception { + /* + * Test joinWithVertices with the input DataSet parameter identical + * to the vertex DataSet + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices() + .map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper()); + + result.getVertices().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2\n" + + "2,4\n" + + "3,6\n" + + "4,8\n" + + "5,10\n"; + } + + @Test + public void testWithLessElements() throws Exception { + /* + * Test joinWithVertices with the input DataSet passed as a parameter containing + * less elements than the vertex DataSet, but of the same type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3) + .map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper()); + + result.getVertices().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2\n" + + "2,4\n" + + "3,6\n" + + "4,4\n" + + "5,5\n"; + } + + @Test + public void testWithDifferentType() throws Exception { + /* + * Test joinWithVertices with the input DataSet passed as a parameter containing + * less elements than the vertex DataSet and of a different type(Boolean) + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3) + .map(new ProjectIdWithTrue()), new DoubleIfTrueMapper()); + + result.getVertices().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2\n" + + "2,4\n" + + "3,6\n" + + "4,4\n" + + "5,5\n"; + } + + @Test + public void testWithDifferentKeys() throws Exception { + /* + * Test joinWithVertices with an input DataSet containing different keys than the vertex DataSet + * - the iterator becomes empty. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongLongTuple2Data(env), + new ProjectSecondMapper()); + + result.getVertices().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,10\n" + + "2,20\n" + + "3,30\n" + + "4,40\n" + + "5,5\n"; + } + + @Test + public void testWithCustomType() throws Exception { + /* + * Test joinWithVertices with a DataSet containing custom parametrised type input values + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongCustomTuple2Data(env), + new CustomValueMapper()); + + result.getVertices().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,10\n" + + "2,20\n" + + "3,30\n" + + "4,40\n" + + "5,5\n"; + } + + @SuppressWarnings("serial") + private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> { + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f0 + tuple.f1; + } + } + + @SuppressWarnings("serial") + private static final class ProjectIdWithTrue implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Boolean>> { + public Tuple2<Long, Boolean> map(Vertex<Long, Long> vertex) throws Exception { + return new Tuple2<Long, Boolean>(vertex.getId(), true); + } + } + + @SuppressWarnings("serial") + private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> { + public Long map(Tuple2<Long, Boolean> tuple) throws Exception { + if(tuple.f1) { + return tuple.f0 * 2; + } + else { + return tuple.f0; + } + } + } + + @SuppressWarnings("serial") + private static final class ProjectSecondMapper implements MapFunction<Tuple2<Long, Long>, Long> { + public Long map(Tuple2<Long, Long> tuple) throws Exception { + return tuple.f1; + } + } + + @SuppressWarnings("serial") + private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> { + public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception { + return (long) tuple.f1.getIntField(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapEdgesITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapEdgesITCase.java new file mode 100644 index 0000000..f7a585d --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapEdgesITCase.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; +import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class MapEdgesITCase extends MultipleProgramsTestBase { + + public MapEdgesITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testWithSameValue() throws Exception { + /* + * Test mapEdges() keeping the same value type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Edge<Long, Long>> mappedEdges = graph.mapEdges(new AddOneMapper()).getEdges(); + + mappedEdges.writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2,13\n" + + "1,3,14\n" + + "2,3,24\n" + + "3,4,35\n" + + "3,5,36\n" + + "4,5,46\n" + + "5,1,52\n"; + } + + @Test + public void testWithStringValue() throws Exception { + /* + * Test mapEdges() and change the value type to String + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Edge<Long, String>> mappedEdges = graph.mapEdges(new ToStringMapper()).getEdges(); + + mappedEdges.writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2,string(12)\n" + + "1,3,string(13)\n" + + "2,3,string(23)\n" + + "3,4,string(34)\n" + + "3,5,string(35)\n" + + "4,5,string(45)\n" + + "5,1,string(51)\n"; + } + + @Test + public void testWithTuple1Type() throws Exception { + /* + * Test mapEdges() and change the value type to a Tuple1 + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Edge<Long, Tuple1<Long>>> mappedEdges = graph.mapEdges(new ToTuple1Mapper()).getEdges(); + + mappedEdges.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,(12)\n" + + "1,3,(13)\n" + + "2,3,(23)\n" + + "3,4,(34)\n" + + "3,5,(35)\n" + + "4,5,(45)\n" + + "5,1,(51)\n"; + } + + @Test + public void testWithCustomType() throws Exception { + /* + * Test mapEdges() and change the value type to a custom type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Edge<Long, DummyCustomType>> mappedEdges = graph.mapEdges(new ToCustomTypeMapper()).getEdges(); + + mappedEdges.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,(T,12)\n" + + "1,3,(T,13)\n" + + "2,3,(T,23)\n" + + "3,4,(T,34)\n" + + "3,5,(T,35)\n" + + "4,5,(T,45)\n" + + "5,1,(T,51)\n"; + } + + @Test + public void testWithParametrizedCustomType() throws Exception { + /* + * Test mapEdges() and change the value type to a parameterized custom type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Edge<Long, DummyCustomParameterizedType<Double>>> mappedEdges = graph.mapEdges( + new ToCustomParametrizedTypeMapper()).getEdges(); + + mappedEdges.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,(12.0,12)\n" + + "1,3,(13.0,13)\n" + + "2,3,(23.0,23)\n" + + "3,4,(34.0,34)\n" + + "3,5,(35.0,35)\n" + + "4,5,(45.0,45)\n" + + "5,1,(51.0,51)\n"; + } + + @SuppressWarnings("serial") + private static final class AddOneMapper implements MapFunction<Edge<Long, Long>, Long> { + public Long map(Edge<Long, Long> edge) throws Exception { + return edge.getValue()+1; + } + } + + @SuppressWarnings("serial") + private static final class ToStringMapper implements MapFunction<Edge<Long, Long>, String> { + public String map(Edge<Long, Long> edge) throws Exception { + return String.format("string(%d)", edge.getValue()); + } + } + + @SuppressWarnings("serial") + private static final class ToTuple1Mapper implements MapFunction<Edge<Long, Long>, Tuple1<Long>> { + public Tuple1<Long> map(Edge<Long, Long> edge) throws Exception { + Tuple1<Long> tupleValue = new Tuple1<Long>(); + tupleValue.setFields(edge.getValue()); + return tupleValue; + } + } + + @SuppressWarnings("serial") + private static final class ToCustomTypeMapper implements MapFunction<Edge<Long, Long>, DummyCustomType> { + public DummyCustomType map(Edge<Long, Long> edge) throws Exception { + DummyCustomType dummyValue = new DummyCustomType(); + dummyValue.setIntField(edge.getValue().intValue()); + return dummyValue; + } + } + + @SuppressWarnings("serial") + private static final class ToCustomParametrizedTypeMapper implements MapFunction<Edge<Long, Long>, + DummyCustomParameterizedType<Double>> { + + public DummyCustomParameterizedType<Double> map(Edge<Long, Long> edge) throws Exception { + DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>(); + dummyValue.setIntField(edge.getValue().intValue()); + dummyValue.setTField(new Double(edge.getValue())); + return dummyValue; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapVerticesITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapVerticesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapVerticesITCase.java new file mode 100644 index 0000000..4e2c858 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapVerticesITCase.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; +import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class MapVerticesITCase extends MultipleProgramsTestBase { + + public MapVerticesITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testWithSameValue() throws Exception { + /* + * Test mapVertices() keeping the same value type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Vertex<Long, Long>> mappedVertices = graph.mapVertices(new AddOneMapper()).getVertices(); + + mappedVertices.writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2\n" + + "2,3\n" + + "3,4\n" + + "4,5\n" + + "5,6\n"; + } + + @Test + public void testWithStringValue() throws Exception { + /* + * Test mapVertices() and change the value type to String + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Vertex<Long, String>> mappedVertices = graph.mapVertices(new ToStringMapper()).getVertices(); + + mappedVertices.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,one\n" + + "2,two\n" + + "3,three\n" + + "4,four\n" + + "5,five\n"; + } + + @Test + public void testWithtuple1Value() throws Exception { + /* + * Test mapVertices() and change the value type to a Tuple1 + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = graph.mapVertices(new ToTuple1Mapper()).getVertices(); + + mappedVertices.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,(1)\n" + + "2,(2)\n" + + "3,(3)\n" + + "4,(4)\n" + + "5,(5)\n"; + } + + @Test + public void testWithCustomType() throws Exception { + /* + * Test mapVertices() and change the value type to a custom type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Vertex<Long, DummyCustomType>> mappedVertices = graph.mapVertices(new ToCustomTypeMapper()).getVertices(); + + mappedVertices.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,(T,1)\n" + + "2,(T,2)\n" + + "3,(T,3)\n" + + "4,(T,4)\n" + + "5,(T,5)\n"; + } + + @Test + public void testWithCustomParametrizedType() throws Exception { + /* + * Test mapVertices() and change the value type to a parameterized custom type + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Vertex<Long, DummyCustomParameterizedType<Double>>> mappedVertices = graph.mapVertices( + new ToCustomParametrizedTypeMapper()).getVertices(); + + mappedVertices.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,(1.0,1)\n" + + "2,(2.0,2)\n" + + "3,(3.0,3)\n" + + "4,(4.0,4)\n" + + "5,(5.0,5)\n"; + } + + @SuppressWarnings("serial") + private static final class AddOneMapper implements MapFunction<Vertex<Long, Long>, Long> { + public Long map(Vertex<Long, Long> value) throws Exception { + return value.getValue()+1; + } + } + + @SuppressWarnings("serial") + private static final class ToStringMapper implements MapFunction<Vertex<Long, Long>, String> { + public String map(Vertex<Long, Long> vertex) throws Exception { + String stringValue; + if (vertex.getValue() == 1) { + stringValue = "one"; + } + else if (vertex.getValue() == 2) { + stringValue = "two"; + } + else if (vertex.getValue() == 3) { + stringValue = "three"; + } + else if (vertex.getValue() == 4) { + stringValue = "four"; + } + else if (vertex.getValue() == 5) { + stringValue = "five"; + } + else { + stringValue = ""; + } + return stringValue; + } + } + + @SuppressWarnings("serial") + private static final class ToTuple1Mapper implements MapFunction<Vertex<Long, Long>, Tuple1<Long>> { + public Tuple1<Long> map(Vertex<Long, Long> vertex) throws Exception { + Tuple1<Long> tupleValue = new Tuple1<Long>(); + tupleValue.setFields(vertex.getValue()); + return tupleValue; + } + } + + @SuppressWarnings("serial") + private static final class ToCustomTypeMapper implements MapFunction<Vertex<Long, Long>, DummyCustomType> { + public DummyCustomType map(Vertex<Long, Long> vertex) throws Exception { + DummyCustomType dummyValue = new DummyCustomType(); + dummyValue.setIntField(vertex.getValue().intValue()); + return dummyValue; + } + } + + @SuppressWarnings("serial") + private static final class ToCustomParametrizedTypeMapper implements MapFunction<Vertex<Long, Long>, + DummyCustomParameterizedType<Double>> { + + public DummyCustomParameterizedType<Double> map(Vertex<Long, Long> vertex) throws Exception { + DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>(); + dummyValue.setIntField(vertex.getValue().intValue()); + dummyValue.setTField(new Double(vertex.getValue())); + return dummyValue; + } + } +}