http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java deleted file mode 100644 index 8b0db35..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithVertices.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; -import org.apache.flink.graph.utils.VertexToTuple2Map; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestJoinWithVertices extends MultipleProgramsTestBase { - - public TestJoinWithVertices(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testJoinWithVertexSet() throws Exception { - /* - * Test joinWithVertices with the input DataSet parameter identical - * to the vertex DataSet - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices() - .map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper()); - - result.getVertices().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2\n" + - "2,4\n" + - "3,6\n" + - "4,8\n" + - "5,10\n"; - } - - @Test - public void testWithLessElements() throws Exception { - /* - * Test joinWithVertices with the input DataSet passed as a parameter containing - * less elements than the vertex DataSet, but of the same type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3) - .map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper()); - - result.getVertices().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2\n" + - "2,4\n" + - "3,6\n" + - "4,4\n" + - "5,5\n"; - } - - @Test - public void testWithDifferentType() throws Exception { - /* - * Test joinWithVertices with the input DataSet passed as a parameter containing - * less elements than the vertex DataSet and of a different type(Boolean) - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3) - .map(new ProjectIdWithTrue()), new DoubleIfTrueMapper()); - - result.getVertices().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2\n" + - "2,4\n" + - "3,6\n" + - "4,4\n" + - "5,5\n"; - } - - @Test - public void testWithDifferentKeys() throws Exception { - /* - * Test joinWithVertices with an input DataSet containing different keys than the vertex DataSet - * - the iterator becomes empty. - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongLongTuple2Data(env), - new ProjectSecondMapper()); - - result.getVertices().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,10\n" + - "2,20\n" + - "3,30\n" + - "4,40\n" + - "5,5\n"; - } - - @Test - public void testWithCustomType() throws Exception { - /* - * Test joinWithVertices with a DataSet containing custom parametrised type input values - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongCustomTuple2Data(env), - new CustomValueMapper()); - - result.getVertices().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,10\n" + - "2,20\n" + - "3,30\n" + - "4,40\n" + - "5,5\n"; - } - - @SuppressWarnings("serial") - private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> { - public Long map(Tuple2<Long, Long> tuple) throws Exception { - return tuple.f0 + tuple.f1; - } - } - - @SuppressWarnings("serial") - private static final class ProjectIdWithTrue implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Boolean>> { - public Tuple2<Long, Boolean> map(Vertex<Long, Long> vertex) throws Exception { - return new Tuple2<Long, Boolean>(vertex.getId(), true); - } - } - - @SuppressWarnings("serial") - private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> { - public Long map(Tuple2<Long, Boolean> tuple) throws Exception { - if(tuple.f1) { - return tuple.f0 * 2; - } - else { - return tuple.f0; - } - } - } - - @SuppressWarnings("serial") - private static final class ProjectSecondMapper implements MapFunction<Tuple2<Long, Long>, Long> { - public Long map(Tuple2<Long, Long> tuple) throws Exception { - return tuple.f1; - } - } - - @SuppressWarnings("serial") - private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> { - public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception { - return (long) tuple.f1.getIntField(); - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java deleted file mode 100644 index 9eccecc..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapEdges.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; -import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestMapEdges extends MultipleProgramsTestBase { - - public TestMapEdges(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testWithSameValue() throws Exception { - /* - * Test mapEdges() keeping the same value type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Edge<Long, Long>> mappedEdges = graph.mapEdges(new AddOneMapper()).getEdges(); - - mappedEdges.writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,2,13\n" + - "1,3,14\n" + - "2,3,24\n" + - "3,4,35\n" + - "3,5,36\n" + - "4,5,46\n" + - "5,1,52\n"; - } - - @Test - public void testWithStringValue() throws Exception { - /* - * Test mapEdges() and change the value type to String - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Edge<Long, String>> mappedEdges = graph.mapEdges(new ToStringMapper()).getEdges(); - - mappedEdges.writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,2,string(12)\n" + - "1,3,string(13)\n" + - "2,3,string(23)\n" + - "3,4,string(34)\n" + - "3,5,string(35)\n" + - "4,5,string(45)\n" + - "5,1,string(51)\n"; - } - - @Test - public void testWithTuple1Type() throws Exception { - /* - * Test mapEdges() and change the value type to a Tuple1 - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Edge<Long, Tuple1<Long>>> mappedEdges = graph.mapEdges(new ToTuple1Mapper()).getEdges(); - - mappedEdges.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,(12)\n" + - "1,3,(13)\n" + - "2,3,(23)\n" + - "3,4,(34)\n" + - "3,5,(35)\n" + - "4,5,(45)\n" + - "5,1,(51)\n"; - } - - @Test - public void testWithCustomType() throws Exception { - /* - * Test mapEdges() and change the value type to a custom type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Edge<Long, DummyCustomType>> mappedEdges = graph.mapEdges(new ToCustomTypeMapper()).getEdges(); - - mappedEdges.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,(T,12)\n" + - "1,3,(T,13)\n" + - "2,3,(T,23)\n" + - "3,4,(T,34)\n" + - "3,5,(T,35)\n" + - "4,5,(T,45)\n" + - "5,1,(T,51)\n"; - } - - @Test - public void testWithParametrizedCustomType() throws Exception { - /* - * Test mapEdges() and change the value type to a parameterized custom type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Edge<Long, DummyCustomParameterizedType<Double>>> mappedEdges = graph.mapEdges( - new ToCustomParametrizedTypeMapper()).getEdges(); - - mappedEdges.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,(12.0,12)\n" + - "1,3,(13.0,13)\n" + - "2,3,(23.0,23)\n" + - "3,4,(34.0,34)\n" + - "3,5,(35.0,35)\n" + - "4,5,(45.0,45)\n" + - "5,1,(51.0,51)\n"; - } - - @SuppressWarnings("serial") - private static final class AddOneMapper implements MapFunction<Edge<Long, Long>, Long> { - public Long map(Edge<Long, Long> edge) throws Exception { - return edge.getValue()+1; - } - } - - @SuppressWarnings("serial") - private static final class ToStringMapper implements MapFunction<Edge<Long, Long>, String> { - public String map(Edge<Long, Long> edge) throws Exception { - return String.format("string(%d)", edge.getValue()); - } - } - - @SuppressWarnings("serial") - private static final class ToTuple1Mapper implements MapFunction<Edge<Long, Long>, Tuple1<Long>> { - public Tuple1<Long> map(Edge<Long, Long> edge) throws Exception { - Tuple1<Long> tupleValue = new Tuple1<Long>(); - tupleValue.setFields(edge.getValue()); - return tupleValue; - } - } - - @SuppressWarnings("serial") - private static final class ToCustomTypeMapper implements MapFunction<Edge<Long, Long>, DummyCustomType> { - public DummyCustomType map(Edge<Long, Long> edge) throws Exception { - DummyCustomType dummyValue = new DummyCustomType(); - dummyValue.setIntField(edge.getValue().intValue()); - return dummyValue; - } - } - - @SuppressWarnings("serial") - private static final class ToCustomParametrizedTypeMapper implements MapFunction<Edge<Long, Long>, - DummyCustomParameterizedType<Double>> { - - public DummyCustomParameterizedType<Double> map(Edge<Long, Long> edge) throws Exception { - DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>(); - dummyValue.setIntField(edge.getValue().intValue()); - dummyValue.setTField(new Double(edge.getValue())); - return dummyValue; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java deleted file mode 100644 index c4d44b0..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestMapVertices.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; -import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestMapVertices extends MultipleProgramsTestBase { - - public TestMapVertices(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testWithSameValue() throws Exception { - /* - * Test mapVertices() keeping the same value type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Vertex<Long, Long>> mappedVertices = graph.mapVertices(new AddOneMapper()).getVertices(); - - mappedVertices.writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,2\n" + - "2,3\n" + - "3,4\n" + - "4,5\n" + - "5,6\n"; - } - - @Test - public void testWithStringValue() throws Exception { - /* - * Test mapVertices() and change the value type to String - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Vertex<Long, String>> mappedVertices = graph.mapVertices(new ToStringMapper()).getVertices(); - - mappedVertices.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,one\n" + - "2,two\n" + - "3,three\n" + - "4,four\n" + - "5,five\n"; - } - - @Test - public void testWithtuple1Value() throws Exception { - /* - * Test mapVertices() and change the value type to a Tuple1 - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = graph.mapVertices(new ToTuple1Mapper()).getVertices(); - - mappedVertices.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,(1)\n" + - "2,(2)\n" + - "3,(3)\n" + - "4,(4)\n" + - "5,(5)\n"; - } - - @Test - public void testWithCustomType() throws Exception { - /* - * Test mapVertices() and change the value type to a custom type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Vertex<Long, DummyCustomType>> mappedVertices = graph.mapVertices(new ToCustomTypeMapper()).getVertices(); - - mappedVertices.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,(T,1)\n" + - "2,(T,2)\n" + - "3,(T,3)\n" + - "4,(T,4)\n" + - "5,(T,5)\n"; - } - - @Test - public void testWithCustomParametrizedType() throws Exception { - /* - * Test mapVertices() and change the value type to a parameterized custom type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Vertex<Long, DummyCustomParameterizedType<Double>>> mappedVertices = graph.mapVertices( - new ToCustomParametrizedTypeMapper()).getVertices(); - - mappedVertices.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,(1.0,1)\n" + - "2,(2.0,2)\n" + - "3,(3.0,3)\n" + - "4,(4.0,4)\n" + - "5,(5.0,5)\n"; - } - - @SuppressWarnings("serial") - private static final class AddOneMapper implements MapFunction<Vertex<Long, Long>, Long> { - public Long map(Vertex<Long, Long> value) throws Exception { - return value.getValue()+1; - } - } - - @SuppressWarnings("serial") - private static final class ToStringMapper implements MapFunction<Vertex<Long, Long>, String> { - public String map(Vertex<Long, Long> vertex) throws Exception { - String stringValue; - if (vertex.getValue() == 1) { - stringValue = "one"; - } - else if (vertex.getValue() == 2) { - stringValue = "two"; - } - else if (vertex.getValue() == 3) { - stringValue = "three"; - } - else if (vertex.getValue() == 4) { - stringValue = "four"; - } - else if (vertex.getValue() == 5) { - stringValue = "five"; - } - else { - stringValue = ""; - } - return stringValue; - } - } - - @SuppressWarnings("serial") - private static final class ToTuple1Mapper implements MapFunction<Vertex<Long, Long>, Tuple1<Long>> { - public Tuple1<Long> map(Vertex<Long, Long> vertex) throws Exception { - Tuple1<Long> tupleValue = new Tuple1<Long>(); - tupleValue.setFields(vertex.getValue()); - return tupleValue; - } - } - - @SuppressWarnings("serial") - private static final class ToCustomTypeMapper implements MapFunction<Vertex<Long, Long>, DummyCustomType> { - public DummyCustomType map(Vertex<Long, Long> vertex) throws Exception { - DummyCustomType dummyValue = new DummyCustomType(); - dummyValue.setIntField(vertex.getValue().intValue()); - return dummyValue; - } - } - - @SuppressWarnings("serial") - private static final class ToCustomParametrizedTypeMapper implements MapFunction<Vertex<Long, Long>, - DummyCustomParameterizedType<Double>> { - - public DummyCustomParameterizedType<Double> map(Vertex<Long, Long> vertex) throws Exception { - DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>(); - dummyValue.setIntField(vertex.getValue().intValue()); - dummyValue.setTField(new Double(vertex.getValue())); - return dummyValue; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java deleted file mode 100644 index 7a02ffe..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnEdgesMethods.java +++ /dev/null @@ -1,317 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.EdgeDirection; -import org.apache.flink.graph.EdgesFunction; -import org.apache.flink.graph.EdgesFunctionWithVertexValue; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestReduceOnEdgesMethods extends MultipleProgramsTestBase { - - public TestReduceOnEdgesMethods(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testLowestWeightOutNeighbor() throws Exception { - /* - * Get the lowest-weight out-neighbor - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT); - verticesWithLowestOutNeighbor.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2\n" + - "2,3\n" + - "3,4\n" + - "4,5\n" + - "5,1\n"; - } - - @Test - public void testLowestWeightInNeighbor() throws Exception { - /* - * Get the lowest-weight in-neighbor - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN); - verticesWithLowestOutNeighbor.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,5\n" + - "2,1\n" + - "3,1\n" + - "4,3\n" + - "5,3\n"; - } - - @Test - public void testMaxWeightEdge() throws Exception { - /* - * Get the maximum weight among all edges - * of a vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = - graph.reduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL); - verticesWithMaxEdgeWeight.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,51\n" + - "2,23\n" + - "3,35\n" + - "4,45\n" + - "5,51\n"; - } - - @Test - public void testLowestWeightOutNeighborNoValue() throws Exception { - /* - * Get the lowest-weight out-neighbor - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.OUT); - verticesWithLowestOutNeighbor.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2\n" + - "2,3\n" + - "3,4\n" + - "4,5\n" + - "5,1\n"; - } - - @Test - public void testLowestWeightInNeighborNoValue() throws Exception { - /* - * Get the lowest-weight in-neighbor - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN); - verticesWithLowestOutNeighbor.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,5\n" + - "2,1\n" + - "3,1\n" + - "4,3\n" + - "5,3\n"; - } - - @Test - public void testMaxWeightAllNeighbors() throws Exception { - /* - * Get the maximum weight among all edges - * of a vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = - graph.reduceOnEdges(new SelectMaxWeightNeighborNoValue(), EdgeDirection.ALL); - verticesWithMaxEdgeWeight.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,51\n" + - "2,23\n" + - "3,35\n" + - "4,45\n" + - "5,51\n"; - } - - @SuppressWarnings("serial") - private static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { - - public Tuple2<Long, Long> iterateEdges( - Vertex<Long, Long> v, - Iterable<Edge<Long, Long>> edges) { - - long weight = Long.MAX_VALUE; - long minNeighorId = 0; - - for (Edge<Long, Long> edge: edges) { - if (edge.getValue() < weight) { - weight = edge.getValue(); - minNeighorId = edge.getTarget(); - } - } - return new Tuple2<Long, Long>(v.getId(), minNeighorId); - } - } - - @SuppressWarnings("serial") - private static final class SelectMaxWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { - - public Tuple2<Long, Long> iterateEdges(Vertex<Long, Long> v, - Iterable<Edge<Long, Long>> edges) { - - long weight = Long.MIN_VALUE; - - for (Edge<Long, Long> edge: edges) { - if (edge.getValue() > weight) { - weight = edge.getValue(); - } - } - return new Tuple2<Long, Long>(v.getId(), weight); - } - } - - @SuppressWarnings("serial") - private static final class SelectMinWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> { - - public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { - - long weight = Long.MAX_VALUE; - long minNeighorId = 0; - long vertexId = -1; - long i=0; - - for (Tuple2<Long, Edge<Long, Long>> edge: edges) { - if (edge.f1.getValue() < weight) { - weight = edge.f1.getValue(); - minNeighorId = edge.f1.getTarget(); - } - if (i==0) { - vertexId = edge.f0; - } i++; - } - return new Tuple2<Long, Long>(vertexId, minNeighorId); - } - } - - @SuppressWarnings("serial") - private static final class SelectMaxWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> { - - public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { - - long weight = Long.MIN_VALUE; - long vertexId = -1; - long i=0; - - for (Tuple2<Long, Edge<Long, Long>> edge: edges) { - if (edge.f1.getValue() > weight) { - weight = edge.f1.getValue(); - } - if (i==0) { - vertexId = edge.f0; - } i++; - } - return new Tuple2<Long, Long>(vertexId, weight); - } - } - - @SuppressWarnings("serial") - private static final class SelectMinWeightInNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { - - public Tuple2<Long, Long> iterateEdges( - Vertex<Long, Long> v, - Iterable<Edge<Long, Long>> edges) { - - long weight = Long.MAX_VALUE; - long minNeighorId = 0; - - for (Edge<Long, Long> edge: edges) { - if (edge.getValue() < weight) { - weight = edge.getValue(); - minNeighorId = edge.getSource(); - } - } - return new Tuple2<Long, Long>(v.getId(), minNeighorId); - } - } - - @SuppressWarnings("serial") - private static final class SelectMinWeightInNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> { - - public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { - - long weight = Long.MAX_VALUE; - long minNeighorId = 0; - long vertexId = -1; - long i=0; - - for (Tuple2<Long, Edge<Long, Long>> edge: edges) { - if (edge.f1.getValue() < weight) { - weight = edge.f1.getValue(); - minNeighorId = edge.f1.getSource(); - } - if (i==0) { - vertexId = edge.f0; - } i++; - } - return new Tuple2<Long, Long>(vertexId, minNeighorId); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java deleted file mode 100644 index e64eacf..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestReduceOnNeighborMethods.java +++ /dev/null @@ -1,303 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import java.util.Iterator; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.EdgeDirection; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.NeighborsFunction; -import org.apache.flink.graph.NeighborsFunctionWithVertexValue; -import org.apache.flink.graph.Vertex; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestReduceOnNeighborMethods extends MultipleProgramsTestBase { - - public TestReduceOnNeighborMethods(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testSumOfOutNeighbors() throws Exception { - /* - * Get the sum of out-neighbor values - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT); - - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,5\n" + - "2,3\n" + - "3,9\n" + - "4,5\n" + - "5,1\n"; - } - - @Test - public void testSumOfInNeighbors() throws Exception { - /* - * Get the sum of in-neighbor values - * times the edge weights for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithSum = - graph.reduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN); - - verticesWithSum.writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,255\n" + - "2,12\n" + - "3,59\n" + - "4,102\n" + - "5,285\n"; - } - - @Test - public void testSumOfOAllNeighbors() throws Exception { - /* - * Get the sum of all neighbor values - * including own vertex value - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL); - - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,11\n" + - "2,6\n" + - "3,15\n" + - "4,12\n" + - "5,13\n"; - } - - @Test - public void testSumOfOutNeighborsNoValue() throws Exception { - /* - * Get the sum of out-neighbor values - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new SumOutNeighborsNoValue(), EdgeDirection.OUT); - - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,5\n" + - "2,3\n" + - "3,9\n" + - "4,5\n" + - "5,1\n"; - } - - @Test - public void testSumOfInNeighborsNoValue() throws Exception { - /* - * Get the sum of in-neighbor values - * times the edge weights for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithSum = - graph.reduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN); - - verticesWithSum.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,255\n" + - "2,12\n" + - "3,59\n" + - "4,102\n" + - "5,285\n"; - } - - @Test - public void testSumOfAllNeighborsNoValue() throws Exception { - /* - * Get the sum of all neighbor values - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new SumAllNeighborsNoValue(), EdgeDirection.ALL); - - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,10\n" + - "2,4\n" + - "3,12\n" + - "4,8\n" + - "5,8\n"; - } - - @SuppressWarnings("serial") - private static final class SumOutNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, - Tuple2<Long, Long>> { - - public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex, - Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { - - long sum = 0; - for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { - sum += neighbor.f1.getValue(); - } - return new Tuple2<Long, Long>(vertex.getId(), sum); - } - } - - @SuppressWarnings("serial") - private static final class SumInNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, - Tuple2<Long, Long>> { - - public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex, - Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { - - long sum = 0; - for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { - sum += neighbor.f0.getValue() * neighbor.f1.getValue(); - } - return new Tuple2<Long, Long>(vertex.getId(), sum); - } - } - - @SuppressWarnings("serial") - private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, - Tuple2<Long, Long>> { - - public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex, - Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { - - long sum = 0; - for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { - sum += neighbor.f1.getValue(); - } - return new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()); - } - } - - @SuppressWarnings("serial") - private static final class SumOutNeighborsNoValue implements NeighborsFunction<Long, Long, Long, - Tuple2<Long, Long>> { - - public Tuple2<Long, Long> iterateNeighbors( - Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { - - long sum = 0; - Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null; - Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator = - neighbors.iterator(); - while(neighborsIterator.hasNext()) { - next = neighborsIterator.next(); - sum += next.f2.getValue(); - } - return new Tuple2<Long, Long>(next.f0, sum); - } - } - - @SuppressWarnings("serial") - private static final class SumInNeighborsNoValue implements NeighborsFunction<Long, Long, Long, - Tuple2<Long, Long>> { - - public Tuple2<Long, Long> iterateNeighbors( - Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { - - long sum = 0; - Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null; - Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator = - neighbors.iterator(); - while(neighborsIterator.hasNext()) { - next = neighborsIterator.next(); - sum += next.f2.getValue() * next.f1.getValue(); - } - return new Tuple2<Long, Long>(next.f0, sum); - } - } - - @SuppressWarnings("serial") - private static final class SumAllNeighborsNoValue implements NeighborsFunction<Long, Long, Long, - Tuple2<Long, Long>> { - - public Tuple2<Long, Long> iterateNeighbors( - Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { - - long sum = 0; - Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null; - Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator = - neighbors.iterator(); - while(neighborsIterator.hasNext()) { - next = neighborsIterator.next(); - sum += next.f2.getValue(); - } - return new Tuple2<Long, Long>(next.f0, sum); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestVertexCentricConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestVertexCentricConnectedComponents.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestVertexCentricConnectedComponents.java deleted file mode 100644 index 614ddd5..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestVertexCentricConnectedComponents.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.graph.test; - -import java.io.BufferedReader; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.spargel.MessageIterator; -import org.apache.flink.graph.spargel.MessagingFunction; -import org.apache.flink.graph.spargel.VertexUpdateFunction; -import org.apache.flink.test.testdata.ConnectedComponentsData; -import org.apache.flink.test.util.JavaProgramTestBase; -import org.apache.flink.types.NullValue; - -@SuppressWarnings("serial") -public class TestVertexCentricConnectedComponents extends JavaProgramTestBase { - - private static final long SEED = 9487520347802987L; - - private static final int NUM_VERTICES = 1000; - - private static final int NUM_EDGES = 10000; - - private String resultPath; - - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempFilePath("results"); - } - - @Override - protected void testProgram() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES); - DataSet<String> edgeString = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n")); - - DataSet<Edge<Long, NullValue>> edges = edgeString.map(new EdgeParser()); - - DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new IdAssigner()); - Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env); - - Graph<Long, Long, NullValue> result = graph.runVertexCentricIteration(new CCUpdater(), new CCMessager(), 100); - - result.getVertices().writeAsCsv(resultPath, "\n", " "); - env.execute(); - } - - public static final class CCUpdater extends VertexUpdateFunction<Long, Long, Long> { - @Override - public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) { - long min = Long.MAX_VALUE; - for (long msg : inMessages) { - min = Math.min(min, msg); - } - if (min < vertexValue) { - setNewVertexValue(min); - } - } - } - - public static final class CCMessager extends MessagingFunction<Long, Long, Long, NullValue> { - @Override - public void sendMessages(Long vertexId, Long componentId) { - sendMessageToAllNeighbors(componentId); - } - } - - /** - * A map function that takes a Long value and creates a 2-tuple out of it: - * <pre>(Long value) -> (value, value)</pre> - */ - public static final class IdAssigner implements MapFunction<Long, Vertex<Long, Long>> { - @Override - public Vertex<Long, Long> map(Long value) { - return new Vertex<Long, Long>(value, value); - } - } - - @Override - protected void postSubmit() throws Exception { - for (BufferedReader reader : getResultReader(resultPath)) { - ConnectedComponentsData.checkOddEvenResult(reader); - } - } - - public static final class EdgeParser extends RichMapFunction<String, Edge<Long, NullValue>> { - public Edge<Long, NullValue> map(String value) { - String[] nums = value.split(" "); - return new Edge<Long, NullValue>(Long.parseLong(nums[0]), Long.parseLong(nums[1]), - NullValue.getInstance()); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestWeaklyConnected.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestWeaklyConnected.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestWeaklyConnected.java deleted file mode 100644 index f5b6d9d..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestWeaklyConnected.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Graph; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestWeaklyConnected extends MultipleProgramsTestBase { - - public TestWeaklyConnected(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testWithConnectedDirected() throws Exception { - /* - * Test isWeaklyConnected() with a connected, directed graph - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - graph.isWeaklyConnected(10).writeAsText(resultPath); - - env.execute(); - expectedResult = "true\n"; - } - - @Test - public void testWithDisconnectedDirected() throws Exception { - /* - * Test isWeaklyConnected() with a disconnected, directed graph - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getDisconnectedLongLongEdgeData(env), env); - - graph.isWeaklyConnected(10).writeAsText(resultPath); - - env.execute(); - expectedResult = "false\n"; - } - - @Test - public void testWithConnectedUndirected() throws Exception { - /* - * Test isWeaklyConnected() with a connected, undirected graph - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env).getUndirected(); - - graph.isWeaklyConnected(10).writeAsText(resultPath); - - env.execute(); - expectedResult = "true\n"; - } - - @Test - public void testWithDisconnectedUndirected() throws Exception { - /* - * Test isWeaklyConnected() with a disconnected, undirected graph - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getDisconnectedLongLongEdgeData(env), env).getUndirected(); - - graph.isWeaklyConnected(10).writeAsText(resultPath); - - env.execute(); - expectedResult = "false\n"; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java new file mode 100644 index 0000000..380e027 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.graph.test; + +import java.io.BufferedReader; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexUpdateFunction; +import org.apache.flink.test.testdata.ConnectedComponentsData; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.types.NullValue; + +@SuppressWarnings("serial") +public class VertexCentricConnectedComponentsITCase extends JavaProgramTestBase { + + private static final long SEED = 9487520347802987L; + + private static final int NUM_VERTICES = 1000; + + private static final int NUM_EDGES = 10000; + + private String resultPath; + + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempFilePath("results"); + } + + @Override + protected void testProgram() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES); + DataSet<String> edgeString = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n")); + + DataSet<Edge<Long, NullValue>> edges = edgeString.map(new EdgeParser()); + + DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new IdAssigner()); + Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env); + + Graph<Long, Long, NullValue> result = graph.runVertexCentricIteration(new CCUpdater(), new CCMessager(), 100); + + result.getVertices().writeAsCsv(resultPath, "\n", " "); + env.execute(); + } + + public static final class CCUpdater extends VertexUpdateFunction<Long, Long, Long> { + @Override + public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) { + long min = Long.MAX_VALUE; + for (long msg : inMessages) { + min = Math.min(min, msg); + } + if (min < vertexValue) { + setNewVertexValue(min); + } + } + } + + public static final class CCMessager extends MessagingFunction<Long, Long, Long, NullValue> { + @Override + public void sendMessages(Long vertexId, Long componentId) { + sendMessageToAllNeighbors(componentId); + } + } + + /** + * A map function that takes a Long value and creates a 2-tuple out of it: + * <pre>(Long value) -> (value, value)</pre> + */ + public static final class IdAssigner implements MapFunction<Long, Vertex<Long, Long>> { + @Override + public Vertex<Long, Long> map(Long value) { + return new Vertex<Long, Long>(value, value); + } + } + + @Override + protected void postSubmit() throws Exception { + for (BufferedReader reader : getResultReader(resultPath)) { + ConnectedComponentsData.checkOddEvenResult(reader); + } + } + + public static final class EdgeParser extends RichMapFunction<String, Edge<Long, NullValue>> { + public Edge<Long, NullValue> map(String value) { + String[] nums = value.split(" "); + return new Edge<Long, NullValue>(Long.parseLong(nums[0]), Long.parseLong(nums[1]), + NullValue.getInstance()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java new file mode 100644 index 0000000..dd08f47 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/WeaklyConnectedITCase.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class WeaklyConnectedITCase extends MultipleProgramsTestBase { + + public WeaklyConnectedITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testWithConnectedDirected() throws Exception { + /* + * Test isWeaklyConnected() with a connected, directed graph + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + graph.isWeaklyConnected(10).writeAsText(resultPath); + + env.execute(); + expectedResult = "true\n"; + } + + @Test + public void testWithDisconnectedDirected() throws Exception { + /* + * Test isWeaklyConnected() with a disconnected, directed graph + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getDisconnectedLongLongEdgeData(env), env); + + graph.isWeaklyConnected(10).writeAsText(resultPath); + + env.execute(); + expectedResult = "false\n"; + } + + @Test + public void testWithConnectedUndirected() throws Exception { + /* + * Test isWeaklyConnected() with a connected, undirected graph + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env).getUndirected(); + + graph.isWeaklyConnected(10).writeAsText(resultPath); + + env.execute(); + expectedResult = "true\n"; + } + + @Test + public void testWithDisconnectedUndirected() throws Exception { + /* + * Test isWeaklyConnected() with a disconnected, undirected graph + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getDisconnectedLongLongEdgeData(env), env).getUndirected(); + + graph.isWeaklyConnected(10).writeAsText(resultPath); + + env.execute(); + expectedResult = "false\n"; + } +}