[FLINK-1201] [gelly] Renamed tests from TestXyz to XyzITCase

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6296c394
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6296c394
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6296c394

Branch: refs/heads/master
Commit: 6296c394fb04d7e01f8a03a348b7a179c209256b
Parents: e6b9cec
Author: vasia <vasilikikala...@gmail.com>
Authored: Mon Jan 26 18:58:33 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 11 10:46:16 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/graph/test/DegreesITCase.java  | 171 ++++++
 .../flink/graph/test/FromCollectionITCase.java  | 120 +++++
 .../flink/graph/test/GraphCreationITCase.java   | 170 ++++++
 .../test/GraphCreationWithMapperITCase.java     | 158 ++++++
 .../flink/graph/test/GraphMutationsITCase.java  | 273 ++++++++++
 .../flink/graph/test/GraphOperationsITCase.java | 267 ++++++++++
 .../flink/graph/test/JoinWithEdgesITCase.java   | 519 +++++++++++++++++++
 .../graph/test/JoinWithVerticesITCase.java      | 218 ++++++++
 .../apache/flink/graph/test/MapEdgesITCase.java | 223 ++++++++
 .../flink/graph/test/MapVerticesITCase.java     | 233 +++++++++
 .../graph/test/ReduceOnEdgesMethodsITCase.java  | 317 +++++++++++
 .../test/ReduceOnNeighborMethodsITCase.java     | 303 +++++++++++
 .../apache/flink/graph/test/TestDegrees.java    | 171 ------
 .../flink/graph/test/TestFromCollection.java    | 120 -----
 .../flink/graph/test/TestGraphCreation.java     | 170 ------
 .../graph/test/TestGraphCreationWithMapper.java | 158 ------
 .../flink/graph/test/TestGraphMutations.java    | 273 ----------
 .../flink/graph/test/TestGraphOperations.java   | 267 ----------
 .../flink/graph/test/TestJoinWithEdges.java     | 519 -------------------
 .../flink/graph/test/TestJoinWithVertices.java  | 218 --------
 .../apache/flink/graph/test/TestMapEdges.java   | 223 --------
 .../flink/graph/test/TestMapVertices.java       | 233 ---------
 .../graph/test/TestReduceOnEdgesMethods.java    | 317 -----------
 .../graph/test/TestReduceOnNeighborMethods.java | 303 -----------
 .../TestVertexCentricConnectedComponents.java   | 118 -----
 .../flink/graph/test/TestWeaklyConnected.java   | 118 -----
 .../VertexCentricConnectedComponentsITCase.java | 118 +++++
 .../flink/graph/test/WeaklyConnectedITCase.java | 118 +++++
 28 files changed, 3208 insertions(+), 3208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/DegreesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/DegreesITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/DegreesITCase.java
new file mode 100644
index 0000000..96a6d20
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/DegreesITCase.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class DegreesITCase extends MultipleProgramsTestBase {
+
+       public DegreesITCase(MultipleProgramsTestBase.ExecutionMode mode){
+               super(mode);
+       }
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
+
+       @Test
+       public void testOutDegrees() throws Exception {
+               /*
+               * Test outDegrees()
+               */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        graph.outDegrees().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2\n" +
+                    "2,1\n" +
+                    "3,2\n" +
+                    "4,1\n" +
+                    "5,1\n";
+    }
+
+       @Test
+       public void testOutDegreesWithNoOutEdges() throws Exception {
+               /*
+                * Test outDegrees() no outgoing edges
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
+
+        graph.outDegrees().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,3\n" +
+                "2,1\n" +
+                "3,1\n" +
+                "4,1\n" +
+                "5,0\n";
+    }
+
+       @Test
+       public void testInDegrees() throws Exception {
+               /*
+                * Test inDegrees()
+                */
+           final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+           Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                   TestGraphUtils.getLongLongEdgeData(env), env);
+
+           graph.inDegrees().writeAsCsv(resultPath);
+           env.execute();
+           expectedResult = "1,1\n" +
+                           "2,1\n" +
+                           "3,2\n" +
+                           "4,1\n" +
+                           "5,2\n";
+    }
+
+       @Test
+       public void testInDegreesWithNoInEdge() throws Exception {
+               /*
+                * Test inDegrees() no ingoing edge
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
+
+        graph.inDegrees().writeAsCsv(resultPath);
+        env.execute();
+        expectedResult = "1,0\n" +
+                       "2,1\n" +
+                       "3,1\n" +
+                       "4,1\n" +
+                       "5,3\n";
+    }
+
+       @Test
+       public void testGetDegrees() throws Exception {
+               /*
+                * Test getDegrees()
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        graph.getDegrees().writeAsCsv(resultPath);
+        env.execute();
+        expectedResult = "1,3\n" +
+                       "2,2\n" +
+                       "3,4\n" +
+                       "4,2\n" +
+                       "5,3\n";
+    }
+
+       @Test
+       public void testGetDegreesWithDisconnectedData() throws Exception {
+        /*
+                * Test getDegrees() with disconnected data
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, NullValue, Long> graph =
+                
Graph.fromDataSet(TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
+
+        graph.outDegrees().writeAsCsv(resultPath);
+        env.execute();
+        expectedResult = "1,2\n" +
+                "2,1\n" +
+                "3,0\n" +
+                "4,1\n" +
+                "5,0\n";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/FromCollectionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/FromCollectionITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/FromCollectionITCase.java
new file mode 100644
index 0000000..5259143
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/FromCollectionITCase.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class FromCollectionITCase extends MultipleProgramsTestBase {
+
+       public FromCollectionITCase(MultipleProgramsTestBase.ExecutionMode 
mode){
+               super(mode);
+       }
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
+
+       @Test
+       public void testFromCollectionVerticesEdges() throws Exception {
+               /*
+                * Test fromCollection(vertices, edges):
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+        Graph<Long, Long, Long> graph = 
Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+                TestGraphUtils.getLongLongEdges(), env);
+
+        graph.getEdges().writeAsCsv(resultPath);
+        env.execute();
+        expectedResult = "1,2,12\n" +
+                       "1,3,13\n" +
+                       "2,3,23\n" +
+                       "3,4,34\n" +
+                       "3,5,35\n" +
+                       "4,5,45\n" +
+                       "5,1,51\n";
+    }
+
+       @Test
+       public void testFromCollectionEdgesNoInitialValue() throws Exception {
+        /*
+         * Test fromCollection(edges) with no initial value for the vertices
+         */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+        Graph<Long, NullValue, Long> graph = 
Graph.fromCollection(TestGraphUtils.getLongLongEdges(),
+                       env);
+
+        graph.getVertices().writeAsCsv(resultPath);
+        env.execute();
+        expectedResult = "1,(null)\n" +
+                       "2,(null)\n" +
+                       "3,(null)\n" +
+                       "4,(null)\n" +
+                       "5,(null)\n";
+    }
+
+       @Test
+       public void testFromCollectionEdgesWithInitialValue() throws Exception {
+        /*
+         * Test fromCollection(edges) with vertices initialised by a
+         * function that takes the id and doubles it
+         */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, Long, Long> graph = 
Graph.fromCollection(TestGraphUtils.getLongLongEdges(),
+                new InitVerticesMapper(), env);
+
+        graph.getVertices().writeAsCsv(resultPath);
+        env.execute();
+        expectedResult = "1,2\n" +
+                       "2,4\n" +
+                       "3,6\n" +
+                       "4,8\n" +
+                       "5,10\n";
+    }
+
+       @SuppressWarnings("serial")
+       private static final class InitVerticesMapper implements 
MapFunction<Long, Long> {
+        public Long map(Long vertexId) {
+            return vertexId * 2;
+        }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationITCase.java
new file mode 100644
index 0000000..4cbdd90
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationITCase.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
+import org.apache.flink.graph.validation.InvalidVertexIdsValidator;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class GraphCreationITCase extends MultipleProgramsTestBase {
+
+       public GraphCreationITCase(MultipleProgramsTestBase.ExecutionMode mode){
+               super(mode);
+       }
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
+
+       @Test
+       public void testCreateWithoutVertexValues() throws Exception {
+       /*
+        * Test create() with edge dataset and no vertex values
+     */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, NullValue, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), env);
+
+               graph.getVertices().writeAsCsv(resultPath);
+               env.execute();
+               expectedResult = "1,(null)\n" +
+                                       "2,(null)\n" +
+                                       "3,(null)\n" +
+                                       "4,(null)\n" +
+                                       "5,(null)\n";
+       }
+
+       @Test
+       public void testCreateWithMapper() throws Exception {
+       /*
+        * Test create() with edge dataset and a mapper that assigns the id as 
value
+     */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env),
+                               new AssignIdAsValueMapper(), env);
+
+               graph.getVertices().writeAsCsv(resultPath);
+               env.execute();
+               expectedResult = "1,1\n" +
+                                       "2,2\n" +
+                                       "3,3\n" +
+                                       "4,4\n" +
+                                       "5,5\n";
+       }
+
+       @Test
+       public void testCreateWithCustomVertexValue() throws Exception {
+               /*
+                * Test create() with edge dataset and a mapper that assigns a 
parametrized custom vertex value
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, DummyCustomParameterizedType<Double>, Long> graph = 
Graph.fromDataSet(
+                               TestGraphUtils.getLongLongEdgeData(env), new 
AssignCustomVertexValueMapper(), env);
+
+               graph.getVertices().writeAsCsv(resultPath);
+               env.execute();
+               expectedResult = "1,(2.0,0)\n" +
+                               "2,(4.0,1)\n" +
+                               "3,(6.0,2)\n" +
+                               "4,(8.0,3)\n" +
+                               "5,(10.0,4)\n";
+       }
+
+       @Test
+       public void testValidate() throws Exception {
+               /*
+                * Test validate():
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Vertex<Long, Long>> vertices = 
TestGraphUtils.getLongLongVertexData(env);
+               DataSet<Edge<Long, Long>> edges = 
TestGraphUtils.getLongLongEdgeData(env);
+
+               Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, 
edges, env);
+               DataSet<Boolean> result = graph.validate(new 
InvalidVertexIdsValidator<Long, Long, Long>());
+
+               result.writeAsText(resultPath);
+               env.execute();
+
+               expectedResult = "true\n";
+       }
+
+       @Test
+       public void testValidateWithInvalidIds() throws Exception {
+               /*
+                * Test validate() - invalid vertex ids
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Vertex<Long, Long>> vertices = 
TestGraphUtils.getLongLongInvalidVertexData(env);
+               DataSet<Edge<Long, Long>> edges = 
TestGraphUtils.getLongLongEdgeData(env);
+
+               Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, 
edges, env);
+               DataSet<Boolean> result = graph.validate(new 
InvalidVertexIdsValidator<Long, Long, Long>());
+               result.writeAsText(resultPath);
+               env.execute();
+
+               expectedResult = "false\n";
+       }
+
+       @SuppressWarnings("serial")
+       private static final class AssignIdAsValueMapper implements 
MapFunction<Long, Long> {
+               public Long map(Long vertexId) {
+                       return vertexId;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class AssignCustomVertexValueMapper implements
+               MapFunction<Long, DummyCustomParameterizedType<Double>> {
+
+               DummyCustomParameterizedType<Double> dummyValue =
+                               new DummyCustomParameterizedType<Double>();
+
+               public DummyCustomParameterizedType<Double> map(Long vertexId) {
+                       dummyValue.setIntField(vertexId.intValue()-1);
+                       dummyValue.setTField(vertexId*2.0);
+                       return dummyValue;
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationWithMapperITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationWithMapperITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationWithMapperITCase.java
new file mode 100644
index 0000000..24f7c82
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphCreationWithMapperITCase.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class GraphCreationWithMapperITCase extends MultipleProgramsTestBase {
+
+       public 
GraphCreationWithMapperITCase(MultipleProgramsTestBase.ExecutionMode mode){
+               super(mode);
+       }
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
+
+       @Test
+       public void testWithDoubleValueMapper() throws Exception {
+               /*
+                * Test create() with edge dataset and a mapper that assigns a 
double constant as value
+            */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, Double, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env),
+                               new AssignDoubleValueMapper(), env);
+
+               graph.getVertices().writeAsCsv(resultPath);
+               env.execute();
+               expectedResult = "1,0.1\n" +
+                               "2,0.1\n" +
+                               "3,0.1\n" +
+                               "4,0.1\n" +
+                               "5,0.1\n";
+       }
+
+       @Test
+       public void testWithTuple2ValueMapper() throws Exception {
+               /*
+                * Test create() with edge dataset and a mapper that assigns a 
Tuple2 as value
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, Tuple2<Long, Long>, Long> graph = Graph.fromDataSet(
+                               TestGraphUtils.getLongLongEdgeData(env), new 
AssignTuple2ValueMapper(), env);
+
+               graph.getVertices().writeAsCsv(resultPath);
+               env.execute();
+               expectedResult = "1,(2,42)\n" +
+                               "2,(4,42)\n" +
+                               "3,(6,42)\n" +
+                               "4,(8,42)\n" +
+                               "5,(10,42)\n";
+       }
+
+       @Test
+       public void testWithConstantValueMapper() throws Exception {
+       /*
+        * Test create() with edge dataset with String key type
+        * and a mapper that assigns a double constant as value
+        */
+       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+       Graph<String, Double, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getStringLongEdgeData(env),
+                       new AssignDoubleConstantMapper(), env);
+
+       graph.getVertices().writeAsCsv(resultPath);
+       env.execute();
+       expectedResult = "1,0.1\n" +
+                       "2,0.1\n" +
+                       "3,0.1\n" +
+                       "4,0.1\n" +
+                       "5,0.1\n";
+       }
+
+       @Test
+       public void testWithDCustomValueMapper() throws Exception {
+               /*
+                * Test create() with edge dataset and a mapper that assigns a 
custom vertex value
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, DummyCustomType, Long> graph = Graph.fromDataSet(
+                               TestGraphUtils.getLongLongEdgeData(env), new 
AssignCustomValueMapper(), env);
+
+               graph.getVertices().writeAsCsv(resultPath);
+               env.execute();
+               expectedResult = "1,(F,0)\n" +
+                               "2,(F,1)\n" +
+                               "3,(F,2)\n" +
+                               "4,(F,3)\n" +
+                               "5,(F,4)\n";
+       }
+
+       @SuppressWarnings("serial")
+       private static final class AssignDoubleValueMapper implements 
MapFunction<Long, Double> {
+               public Double map(Long value) {
+                       return 0.1d;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class AssignTuple2ValueMapper implements 
MapFunction<Long, Tuple2<Long, Long>> {
+               public Tuple2<Long, Long> map(Long vertexId) {
+                       return new Tuple2<Long, Long>(vertexId*2, 42l);
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class AssignDoubleConstantMapper implements 
MapFunction<String, Double> {
+               public Double map(String value) {
+                       return 0.1d;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class AssignCustomValueMapper implements 
MapFunction<Long, DummyCustomType> {
+               public DummyCustomType map(Long vertexId) {
+                       return new DummyCustomType(vertexId.intValue()-1, 
false);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphMutationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphMutationsITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphMutationsITCase.java
new file mode 100644
index 0000000..3af8943
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphMutationsITCase.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class GraphMutationsITCase extends MultipleProgramsTestBase {
+
+       public GraphMutationsITCase(MultipleProgramsTestBase.ExecutionMode 
mode){
+               super(mode);
+       }
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
+
+       @Test
+       public void testAddVertex() throws Exception {
+               /*
+                * Test addVertex() -- simple case
+                */     
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+
+               List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, 
Long>>();
+               edges.add(new Edge<Long, Long>(6L, 1L, 61L));
+               graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L), edges);
+               graph.getEdges().writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,2,12\n" +
+                               "1,3,13\n" +
+                               "2,3,23\n" +
+                               "3,4,34\n" +
+                               "3,5,35\n" +
+                               "4,5,45\n" +
+                               "5,1,51\n" +
+                               "6,1,61\n";     
+       }
+
+       @Test
+       public void testAddVertexExisting() throws Exception {
+               /*
+                * Test addVertex() -- add an existing vertex
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               
+               List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, 
Long>>();
+               edges.add(new Edge<Long, Long>(1L, 5L, 15L));
+               graph = graph.addVertex(new Vertex<Long, Long>(1L, 1L), edges);
+               graph.getEdges().writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,2,12\n" +
+                                       "1,3,13\n" +
+                                       "1,5,15\n" +
+                                       "2,3,23\n" +
+                                       "3,4,34\n" +
+                                       "3,5,35\n" +
+                                       "4,5,45\n" +
+                                       "5,1,51\n";
+       }
+
+       @Test
+       public void testAddVertexNoEdges() throws Exception {
+               /*
+                * Test addVertex() -- add vertex with empty edge set
+                */     
+               
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, 
Long>>();
+               graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L), edges);
+               graph.getVertices().writeAsCsv(resultPath);
+               env.execute();
+       
+               expectedResult = "1,1\n" +
+                       "2,2\n" +
+                       "3,3\n" +
+                       "4,4\n" +
+                       "5,5\n" +
+                       "6,6\n";
+       }
+
+       @Test
+       public void testRemoveVertex() throws Exception {
+               /*
+                * Test removeVertex() -- simple case
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               graph = graph.removeVertex(new Vertex<Long, Long>(5L, 5L));
+               graph.getEdges().writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,2,12\n" +
+                               "1,3,13\n" +
+                               "2,3,23\n" +
+                               "3,4,34\n";
+       }
+
+       @Test
+       public void testRemoveInvalidVertex() throws Exception {
+               /*
+                * Test removeVertex() -- remove an invalid vertex
+                */     
+               
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               graph = graph.removeVertex(new Vertex<Long, Long>(6L, 6L));
+               graph.getEdges().writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,2,12\n" +
+                               "1,3,13\n" +
+                               "2,3,23\n" +
+                               "3,4,34\n" +
+                               "3,5,35\n" +
+                               "4,5,45\n" +
+                               "5,1,51\n";
+       }
+       
+       @Test
+       public void testAddEdge() throws Exception {
+               /*
+                * Test addEdge() -- simple case
+                */
+               
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               graph = graph.addEdge(new Vertex<Long, Long>(6L, 6L), new 
Vertex<Long, Long>(1L, 1L),
+                               61L);
+               graph.getEdges().writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,2,12\n" +
+                               "1,3,13\n" +
+                               "2,3,23\n" +
+                               "3,4,34\n" +
+                               "3,5,35\n" +
+                               "4,5,45\n" +
+                               "5,1,51\n" +
+                               "6,1,61\n";     
+       }
+       
+       @Test
+       public void testAddExistingEdge() throws Exception {
+               /*
+                * Test addEdge() -- add already existing edge
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               graph = graph.addEdge(new Vertex<Long, Long>(1L, 1L), new 
Vertex<Long, Long>(2L, 2L),
+                               12L);
+               graph.getEdges().writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,2,12\n" +
+                               "1,2,12\n" +
+                               "1,3,13\n" +
+                               "2,3,23\n" +
+                               "3,4,34\n" +
+                               "3,5,35\n" +
+                               "4,5,45\n" +
+                               "5,1,51\n";     
+       }
+       
+       @Test
+       public void testRemoveVEdge() throws Exception {
+               /*
+                * Test removeEdge() -- simple case
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               graph = graph.removeEdge(new Edge<Long, Long>(5L, 1L, 51L));
+               graph.getEdges().writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,2,12\n" +
+                               "1,3,13\n" +
+                               "2,3,23\n" +
+                               "3,4,34\n" +
+                               "3,5,35\n" +
+                               "4,5,45\n";
+       }
+       
+       @Test
+       public void testRemoveInvalidEdge() throws Exception {
+               /*
+                * Test removeEdge() -- invalid edge
+                */
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               graph = graph.removeEdge(new Edge<Long, Long>(6L, 1L, 61L));
+               graph.getEdges().writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,2,12\n" +
+                               "1,3,13\n" +
+                               "2,3,23\n" +
+                               "3,4,34\n" +
+                               "3,5,35\n" +
+                               "4,5,45\n" +
+                               "5,1,51\n";
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphOperationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphOperationsITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphOperationsITCase.java
new file mode 100644
index 0000000..f194a60
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GraphOperationsITCase.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class GraphOperationsITCase extends MultipleProgramsTestBase {
+
+       public GraphOperationsITCase(MultipleProgramsTestBase.ExecutionMode 
mode){
+               super(mode);
+       }
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
+
+       @Test
+       public void testUndirected() throws Exception {
+               /*
+                * Test getUndirected()
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+
+               graph.getUndirected().getEdges().writeAsCsv(resultPath);
+               env.execute();
+               expectedResult = "1,2,12\n" + "2,1,12\n" +
+                                       "1,3,13\n" + "3,1,13\n" +
+                                       "2,3,23\n" + "3,2,23\n" +
+                                       "3,4,34\n" + "4,3,34\n" +
+                                       "3,5,35\n" + "5,3,35\n" +
+                                       "4,5,45\n" + "5,4,45\n" +
+                                       "5,1,51\n" + "1,5,51\n";
+       }
+
+       @Test
+       public void testReverse() throws Exception {
+               /*
+                * Test reverse()
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+
+               graph.reverse().getEdges().writeAsCsv(resultPath);
+               env.execute();
+               expectedResult = "2,1,12\n" +
+                                       "3,1,13\n" +
+                                       "3,2,23\n" +
+                                       "4,3,34\n" +
+                                       "5,3,35\n" +
+                                       "5,4,45\n" +
+                                       "1,5,51\n";
+       }
+
+       @SuppressWarnings("serial")
+       @Test
+       public void testSubGraph() throws Exception {
+               /*
+                * Test subgraph:
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               graph.subgraph(new FilterFunction<Vertex<Long, Long>>() {
+                                                  public boolean 
filter(Vertex<Long, Long> vertex) throws Exception {
+                                                          return 
(vertex.getValue() > 2);
+                                                  }
+                                          },
+                               new FilterFunction<Edge<Long, Long>>() {
+                                       public boolean filter(Edge<Long, Long> 
edge) throws Exception {
+                                               return (edge.getValue() > 34);
+                                       }
+                               }).getEdges().writeAsCsv(resultPath);
+
+               env.execute();
+               expectedResult = "3,5,35\n" +
+                                       "4,5,45\n";
+       }
+
+       @SuppressWarnings("serial")
+       @Test
+       public void testFilterVertices() throws Exception {
+               /*
+                * Test filterOnVertices:
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               graph.filterOnVertices(new FilterFunction<Vertex<Long, Long>>() 
{
+                       public boolean filter(Vertex<Long, Long> vertex) throws 
Exception {
+                               return (vertex.getValue() > 2);
+                       }
+               }).getEdges().writeAsCsv(resultPath);
+
+               env.execute();
+               expectedResult =  "3,4,34\n" +
+                               "3,5,35\n" +
+                               "4,5,45\n";
+       }
+
+       @SuppressWarnings("serial")
+       @Test
+       public void testFilterEdges() throws Exception {
+               /*
+                * Test filterOnEdges:
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               graph.filterOnEdges(new FilterFunction<Edge<Long, Long>>() {
+                       public boolean filter(Edge<Long, Long> edge) throws 
Exception {
+                               return (edge.getValue() > 34);
+                       }
+               }).getEdges().writeAsCsv(resultPath);
+
+               env.execute();
+               expectedResult = "3,5,35\n" +
+                                       "4,5,45\n" +
+                                       "5,1,51\n";
+       }
+
+       @Test
+       public void testNumberOfVertices() throws Exception {
+               /*
+                * Test numberOfVertices()
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               graph.numberOfVertices().writeAsText(resultPath);
+
+               env.execute();
+               expectedResult = "5";
+       }
+
+       @Test
+       public void testNumberOfEdges() throws Exception {
+               /*
+                * Test numberOfEdges()
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               graph.numberOfEdges().writeAsText(resultPath);
+
+               env.execute();
+               expectedResult = "7";
+       }
+
+       @Test
+       public void testVertexIds() throws Exception {
+               /*
+                * Test getVertexIds()
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               graph.getVertexIds().writeAsText(resultPath);
+
+               env.execute();
+               expectedResult = "1\n2\n3\n4\n5\n";
+       }
+
+       @Test
+       public void testEdgesIds() throws Exception {
+               /*
+                * Test getEdgeIds()
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               graph.getEdgeIds().writeAsCsv(resultPath);
+
+               env.execute();
+               expectedResult = "1,2\n" + "1,3\n" +
+                               "2,3\n" + "3,4\n" +
+                               "3,5\n" + "4,5\n" +
+                               "5,1\n";
+       }
+
+       @Test
+       public void testUnion() throws Exception {
+               /*
+                * Test union()
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+
+               List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, 
Long>>();
+               List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, 
Long>>();
+
+               vertices.add(new Vertex<Long, Long>(6L, 6L));
+               edges.add(new Edge<Long, Long>(6L, 1L, 61L));
+
+               graph = graph.union(Graph.fromCollection(vertices, edges, env));
+
+               graph.getEdges().writeAsCsv(resultPath);
+
+               env.execute();
+
+               expectedResult = "1,2,12\n" +
+                                       "1,3,13\n" +
+                                       "2,3,23\n" +
+                                       "3,4,34\n" +
+                                       "3,5,35\n" +
+                                       "4,5,45\n" +
+                                       "5,1,51\n" +
+                                       "6,1,61\n";
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithEdgesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithEdgesITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithEdgesITCase.java
new file mode 100644
index 0000000..6f4f6a8
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithEdgesITCase.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
+import org.apache.flink.graph.utils.EdgeToTuple3Map;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
+
+       public JoinWithEdgesITCase(MultipleProgramsTestBase.ExecutionMode mode){
+               super(mode);
+       }
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
+
+       @Test
+       public void testWithEdgesInputDataset() throws Exception {
+               /*
+                * Test joinWithEdges with the input DataSet parameter identical
+                * to the edge DataSet
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges()
+                        .map(new EdgeToTuple3Map<Long, Long>()), new 
AddValuesMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+                       "1,3,26\n" +
+                       "2,3,46\n" +
+                       "3,4,68\n" +
+                       "3,5,70\n" +
+                       "4,5,90\n" +
+                       "5,1,102\n";
+    }
+
+       @Test
+       public void testWithLessElements() throws Exception {
+           /*
+                * Test joinWithEdges with the input DataSet passed as a 
parameter containing
+                * less elements than the edge DataSet, but of the same type
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdges(graph.getEdges().first(3)
+                        .map(new EdgeToTuple3Map<Long, Long>()), new 
AddValuesMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+                       "1,3,26\n" +
+                       "2,3,46\n" +
+                       "3,4,34\n" +
+                       "3,5,35\n" +
+                       "4,5,45\n" +
+                       "5,1,51\n";
+    }
+
+       @Test
+       public void testWithLessElementsDifferentType() throws Exception {
+           /*
+                * Test joinWithEdges with the input DataSet passed as a 
parameter containing
+                * less elements than the edge DataSet and of a different 
type(Boolean)
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdges(graph.getEdges().first(3)
+                        .map(new BooleanEdgeValueMapper()), new 
DoubleIfTrueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+                       "1,3,26\n" +
+                       "2,3,46\n" +
+                       "3,4,34\n" +
+                       "3,5,35\n" +
+                       "4,5,45\n" +
+                       "5,1,51\n";
+    }
+
+       @Test
+       public void testWithNoCommonKeys() throws Exception {
+           /*
+                * Test joinWithEdges with the input DataSet containing 
different keys than the edge DataSet
+                * - the iterator becomes empty.
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env),
+                new DoubleValueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+                       "1,3,26\n" +
+                       "2,3,46\n" +
+                       "3,4,68\n" +
+                       "3,5,35\n" +
+                       "4,5,45\n" +
+                       "5,1,51\n";
+    }
+
+       @Test
+       public void testWithCustomType() throws Exception {
+           /*
+            * Test joinWithEdges with a DataSet containing custom parametrised 
type input values
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env),
+                new CustomValueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,10\n" +
+                       "1,3,20\n" +
+                       "2,3,30\n" +
+                       "3,4,40\n" +
+                       "3,5,35\n" +
+                       "4,5,45\n" +
+                       "5,1,51\n";
+    }
+
+       @Test
+       public void testWithEdgesOnSource() throws Exception {
+           /*
+                * Test joinWithEdgesOnSource with the input DataSet parameter 
identical
+                * to the edge DataSet
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(graph.getEdges()
+                        .map(new ProjectSourceAndValueMapper()), new 
AddValuesMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+                       "1,3,25\n" +
+                       "2,3,46\n" +
+                       "3,4,68\n" +
+                       "3,5,69\n" +
+                       "4,5,90\n" +
+                       "5,1,102\n";
+    }
+
+       @Test
+       public void testOnSourceWithLessElements() throws Exception {
+           /*
+                * Test joinWithEdgesOnSource with the input DataSet passed as 
a parameter containing
+                * less elements than the edge DataSet, but of the same type
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(graph.getEdges().first(3)
+                        .map(new ProjectSourceAndValueMapper()), new 
AddValuesMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+                       "1,3,25\n" +
+                       "2,3,46\n" +
+                       "3,4,34\n" +
+                       "3,5,35\n" +
+                       "4,5,45\n" +
+                       "5,1,51\n";
+    }
+
+       @Test
+       public void testOnSourceWithDifferentType() throws Exception {
+           /*
+                * Test joinWithEdgesOnSource with the input DataSet passed as 
a parameter containing
+                * less elements than the edge DataSet and of a different 
type(Boolean)
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(graph.getEdges().first(3)
+                        .map(new ProjectSourceWithTrueMapper()), new 
DoubleIfTrueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+                       "1,3,26\n" +
+                       "2,3,46\n" +
+                       "3,4,34\n" +
+                       "3,5,35\n" +
+                       "4,5,45\n" +
+                       "5,1,51\n";
+    }
+
+       @Test
+       public void testOnSourceWithNoCommonKeys() throws Exception {
+           /*
+                * Test joinWithEdgesOnSource with the input DataSet containing 
different keys than the edge DataSet
+                * - the iterator becomes empty.
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env),
+                new DoubleValueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,20\n" +
+                       "1,3,20\n" +
+                       "2,3,60\n" +
+                       "3,4,80\n" +
+                       "3,5,80\n" +
+                       "4,5,120\n" +
+                       "5,1,51\n";
+    }
+
+       @Test
+       public void testOnSourceWithCustom() throws Exception {
+           /*
+            * Test joinWithEdgesOnSource with a DataSet containing custom 
parametrised type input values
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env),
+                new CustomValueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,10\n" +
+                       "1,3,10\n" +
+                       "2,3,30\n" +
+                       "3,4,40\n" +
+                       "3,5,40\n" +
+                       "4,5,45\n" +
+                       "5,1,51\n";
+    }
+
+       @Test
+       public void testWithEdgesOnTarget() throws Exception {
+    /*
+        * Test joinWithEdgesOnTarget with the input DataSet parameter identical
+        * to the edge DataSet
+        */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(graph.getEdges()
+                        .map(new ProjectTargetAndValueMapper()), new 
AddValuesMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+                       "1,3,26\n" +
+                       "2,3,36\n" +
+                       "3,4,68\n" +
+                       "3,5,70\n" +
+                       "4,5,80\n" +
+                       "5,1,102\n";
+    }
+
+       @Test
+       public void testWithOnTargetWithLessElements() throws Exception {
+           /*
+                * Test joinWithEdgesOnTarget with the input DataSet passed as 
a parameter containing
+                * less elements than the edge DataSet, but of the same type
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
+                        .map(new ProjectTargetAndValueMapper()), new 
AddValuesMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+                       "1,3,26\n" +
+                       "2,3,36\n" +
+                       "3,4,34\n" +
+                       "3,5,35\n" +
+                       "4,5,45\n" +
+                       "5,1,51\n";
+    }
+
+       @Test
+       public void testOnTargetWithDifferentType() throws Exception {
+           /*
+                * Test joinWithEdgesOnTarget with the input DataSet passed as 
a parameter containing
+                * less elements than the edge DataSet and of a different 
type(Boolean)
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
+                        .map(new ProjectTargetWithTrueMapper()), new 
DoubleIfTrueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+                       "1,3,26\n" +
+                       "2,3,46\n" +
+                       "3,4,34\n" +
+                       "3,5,35\n" +
+                       "4,5,45\n" +
+                       "5,1,51\n";
+    }
+
+       @Test
+       public void testOnTargetWithNoCommonKeys() throws Exception {
+           /*
+                * Test joinWithEdgesOnTarget with the input DataSet containing 
different keys than the edge DataSet
+                * - the iterator becomes empty.
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env),
+                new DoubleValueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,20\n" +
+                       "1,3,40\n" +
+                       "2,3,40\n" +
+                       "3,4,80\n" +
+                       "3,5,35\n" +
+                       "4,5,45\n" +
+                       "5,1,140\n";
+    }
+
+       @Test
+       public void testOnTargetWithCustom() throws Exception {
+           /*
+            * Test joinWithEdgesOnTarget with a DataSet containing custom 
parametrised type input values
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env),
+                new CustomValueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,10\n" +
+                       "1,3,20\n" +
+                       "2,3,20\n" +
+                       "3,4,40\n" +
+                       "3,5,35\n" +
+                       "4,5,45\n" +
+                       "5,1,51\n";
+    }
+
+       @SuppressWarnings("serial")
+       private static final class AddValuesMapper implements 
MapFunction<Tuple2<Long, Long>, Long> {
+               public Long map(Tuple2<Long, Long> tuple) throws Exception {
+                       return tuple.f0 + tuple.f1;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class BooleanEdgeValueMapper implements 
MapFunction<Edge<Long, Long>, Tuple3<Long, Long, Boolean>> {
+        public Tuple3<Long, Long, Boolean> map(Edge<Long, Long> edge) throws 
Exception {
+            return new Tuple3<Long, Long, Boolean>(edge.getSource(),
+                    edge.getTarget(), true);
+        }
+    }
+
+       @SuppressWarnings("serial")
+       private static final class DoubleIfTrueMapper implements 
MapFunction<Tuple2<Long, Boolean>, Long> {
+        public Long map(Tuple2<Long, Boolean> tuple) throws Exception {
+            if(tuple.f1) {
+                return tuple.f0 * 2;
+            }
+            else {
+                return tuple.f0;
+            }
+        }
+    }
+
+       @SuppressWarnings("serial")
+       private static final class DoubleValueMapper implements 
MapFunction<Tuple2<Long, Long>, Long> {
+        public Long map(Tuple2<Long, Long> tuple) throws Exception {
+            return tuple.f1 * 2;
+        }
+    }
+
+       @SuppressWarnings("serial")
+       private static final class CustomValueMapper implements 
MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> {
+        public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> 
tuple) throws Exception {
+            return (long) tuple.f1.getIntField();
+        }
+    }
+
+       @SuppressWarnings("serial")
+       private static final class ProjectSourceAndValueMapper implements 
MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> {
+        public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception {
+            return new Tuple2<Long, Long>(edge.getSource(), edge.getValue());
+        }
+    }
+
+       @SuppressWarnings("serial")
+       private static final class ProjectSourceWithTrueMapper implements 
MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> {
+        public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws 
Exception {
+            return new Tuple2<Long, Boolean>(edge.getSource(), true);
+        }
+    }
+
+       @SuppressWarnings("serial")
+       private static final class ProjectTargetAndValueMapper implements 
MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> {
+        public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception {
+            return new Tuple2<Long, Long>(edge.getTarget(), edge.getValue());
+        }
+    }
+
+       @SuppressWarnings("serial")
+       private static final class ProjectTargetWithTrueMapper implements 
MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> {
+        public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws 
Exception {
+            return new Tuple2<Long, Boolean>(edge.getTarget(), true);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithVerticesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithVerticesITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithVerticesITCase.java
new file mode 100644
index 0000000..0574265
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/JoinWithVerticesITCase.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
+import org.apache.flink.graph.utils.VertexToTuple2Map;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class JoinWithVerticesITCase extends MultipleProgramsTestBase {
+
+       public JoinWithVerticesITCase(MultipleProgramsTestBase.ExecutionMode 
mode){
+               super(mode);
+       }
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
+
+       @Test
+       public void testJoinWithVertexSet() throws Exception {
+               /*
+                * Test joinWithVertices with the input DataSet parameter 
identical
+                * to the vertex DataSet
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithVertices(graph.getVertices()
+                        .map(new VertexToTuple2Map<Long, Long>()), new 
AddValuesMapper());
+
+        result.getVertices().writeAsCsv(resultPath);
+        env.execute();
+
+       expectedResult = "1,2\n" +
+                       "2,4\n" +
+                       "3,6\n" +
+                       "4,8\n" +
+                       "5,10\n";
+    }
+
+       @Test
+       public void testWithLessElements() throws Exception {
+       /*
+        * Test joinWithVertices with the input DataSet passed as a parameter 
containing
+        * less elements than the vertex DataSet, but of the same type
+        */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithVertices(graph.getVertices().first(3)
+                        .map(new VertexToTuple2Map<Long, Long>()), new 
AddValuesMapper());
+
+        result.getVertices().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2\n" +
+                       "2,4\n" +
+                       "3,6\n" +
+                       "4,4\n" +
+                       "5,5\n";
+    }
+
+       @Test
+       public void testWithDifferentType() throws Exception {
+       /*
+        * Test joinWithVertices with the input DataSet passed as a parameter 
containing
+        * less elements than the vertex DataSet and of a different 
type(Boolean)
+        */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithVertices(graph.getVertices().first(3)
+                        .map(new ProjectIdWithTrue()), new 
DoubleIfTrueMapper());
+
+        result.getVertices().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2\n" +
+                       "2,4\n" +
+                       "3,6\n" +
+                       "4,4\n" +
+                       "5,5\n";
+    }
+
+       @Test
+       public void testWithDifferentKeys() throws Exception {
+               /*
+                * Test joinWithVertices with an input DataSet containing 
different keys than the vertex DataSet
+                * - the iterator becomes empty.
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithVertices(TestGraphUtils.getLongLongTuple2Data(env),
+                new ProjectSecondMapper());
+
+        result.getVertices().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,10\n" +
+                       "2,20\n" +
+                       "3,30\n" +
+                       "4,40\n" +
+                       "5,5\n";
+    }
+
+       @Test
+       public void testWithCustomType() throws Exception {
+               /*
+                * Test joinWithVertices with a DataSet containing custom 
parametrised type input values
+                */
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = 
graph.joinWithVertices(TestGraphUtils.getLongCustomTuple2Data(env),
+                new CustomValueMapper());
+
+        result.getVertices().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,10\n" +
+                       "2,20\n" +
+                       "3,30\n" +
+                       "4,40\n" +
+                       "5,5\n";
+    }
+
+       @SuppressWarnings("serial")
+       private static final class AddValuesMapper implements 
MapFunction<Tuple2<Long, Long>, Long> {
+               public Long map(Tuple2<Long, Long> tuple) throws Exception {
+                       return tuple.f0 + tuple.f1;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class ProjectIdWithTrue implements 
MapFunction<Vertex<Long, Long>, Tuple2<Long, Boolean>> {
+        public Tuple2<Long, Boolean> map(Vertex<Long, Long> vertex) throws 
Exception {
+            return new Tuple2<Long, Boolean>(vertex.getId(), true);
+        }
+    }
+
+       @SuppressWarnings("serial")
+       private static final class DoubleIfTrueMapper implements 
MapFunction<Tuple2<Long, Boolean>, Long> {
+        public Long map(Tuple2<Long, Boolean> tuple) throws Exception {
+            if(tuple.f1) {
+                return tuple.f0 * 2;
+            }
+            else {
+                return tuple.f0;
+            }
+        }
+    }
+
+       @SuppressWarnings("serial")
+       private static final class ProjectSecondMapper implements 
MapFunction<Tuple2<Long, Long>, Long> {
+        public Long map(Tuple2<Long, Long> tuple) throws Exception {
+            return tuple.f1;
+        }
+    }
+
+       @SuppressWarnings("serial")
+       private static final class CustomValueMapper implements 
MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> {
+        public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> 
tuple) throws Exception {
+            return (long) tuple.f1.getIntField();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapEdgesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapEdgesITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapEdgesITCase.java
new file mode 100644
index 0000000..f7a585d
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapEdgesITCase.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class MapEdgesITCase extends MultipleProgramsTestBase {
+
+       public MapEdgesITCase(MultipleProgramsTestBase.ExecutionMode mode){
+               super(mode);
+       }
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
+
+       @Test
+       public void testWithSameValue() throws Exception {
+               /*
+                * Test mapEdges() keeping the same value type
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               
+               DataSet<Edge<Long, Long>> mappedEdges = graph.mapEdges(new 
AddOneMapper()).getEdges();
+               
+               mappedEdges.writeAsCsv(resultPath);
+               env.execute();
+               expectedResult = "1,2,13\n" +
+                               "1,3,14\n" +
+                               "2,3,24\n" +
+                               "3,4,35\n" +
+                               "3,5,36\n" + 
+                               "4,5,46\n" + 
+                               "5,1,52\n";
+       }
+
+       @Test
+       public void testWithStringValue() throws Exception {
+               /*
+                * Test mapEdges() and change the value type to String
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               
+               DataSet<Edge<Long, String>> mappedEdges = graph.mapEdges(new 
ToStringMapper()).getEdges();
+               
+               mappedEdges.writeAsCsv(resultPath);
+               env.execute();
+               expectedResult = "1,2,string(12)\n" +
+                               "1,3,string(13)\n" +
+                               "2,3,string(23)\n" +
+                               "3,4,string(34)\n" +
+                               "3,5,string(35)\n" + 
+                               "4,5,string(45)\n" + 
+                               "5,1,string(51)\n";
+       }
+
+       @Test
+       public void testWithTuple1Type() throws Exception {
+               /*
+                * Test mapEdges() and change the value type to a Tuple1
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               
+               DataSet<Edge<Long, Tuple1<Long>>> mappedEdges = 
graph.mapEdges(new ToTuple1Mapper()).getEdges();
+               
+               mappedEdges.writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,2,(12)\n" +
+                               "1,3,(13)\n" +
+                               "2,3,(23)\n" +
+                               "3,4,(34)\n" +
+                               "3,5,(35)\n" + 
+                               "4,5,(45)\n" + 
+                               "5,1,(51)\n";
+       }
+
+       @Test
+       public void testWithCustomType() throws Exception {
+               /*
+                * Test mapEdges() and change the value type to a custom type
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               
+               DataSet<Edge<Long, DummyCustomType>> mappedEdges = 
graph.mapEdges(new ToCustomTypeMapper()).getEdges();
+               
+               mappedEdges.writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,2,(T,12)\n" +
+                       "1,3,(T,13)\n" +
+                       "2,3,(T,23)\n" +
+                       "3,4,(T,34)\n" +
+                       "3,5,(T,35)\n" + 
+                       "4,5,(T,45)\n" + 
+                       "5,1,(T,51)\n";
+       }
+
+       @Test
+       public void testWithParametrizedCustomType() throws Exception {
+               /*
+                * Test mapEdges() and change the value type to a parameterized 
custom type
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               
+               DataSet<Edge<Long, DummyCustomParameterizedType<Double>>> 
mappedEdges = graph.mapEdges(
+                               new 
ToCustomParametrizedTypeMapper()).getEdges();
+               
+               mappedEdges.writeAsCsv(resultPath);
+               env.execute();
+       
+               expectedResult = "1,2,(12.0,12)\n" +
+                       "1,3,(13.0,13)\n" +
+                       "2,3,(23.0,23)\n" +
+                       "3,4,(34.0,34)\n" +
+                       "3,5,(35.0,35)\n" + 
+                       "4,5,(45.0,45)\n" + 
+                       "5,1,(51.0,51)\n";
+       }
+
+       @SuppressWarnings("serial")
+       private static final class AddOneMapper implements 
MapFunction<Edge<Long, Long>, Long> {
+               public Long map(Edge<Long, Long> edge) throws Exception {
+                       return edge.getValue()+1;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class ToStringMapper implements 
MapFunction<Edge<Long, Long>, String> {
+               public String map(Edge<Long, Long> edge) throws Exception {
+                       return String.format("string(%d)", edge.getValue());
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class ToTuple1Mapper implements 
MapFunction<Edge<Long, Long>, Tuple1<Long>> {
+               public Tuple1<Long> map(Edge<Long, Long> edge) throws Exception 
{
+                       Tuple1<Long> tupleValue = new Tuple1<Long>();
+                       tupleValue.setFields(edge.getValue());
+                       return tupleValue;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class ToCustomTypeMapper implements 
MapFunction<Edge<Long, Long>, DummyCustomType> {
+               public DummyCustomType map(Edge<Long, Long> edge) throws 
Exception {
+                       DummyCustomType dummyValue = new DummyCustomType();
+                       dummyValue.setIntField(edge.getValue().intValue());     
                                        
+                       return dummyValue;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class ToCustomParametrizedTypeMapper implements 
MapFunction<Edge<Long, Long>, 
+               DummyCustomParameterizedType<Double>> {
+
+               public DummyCustomParameterizedType<Double> map(Edge<Long, 
Long> edge) throws Exception {
+                       DummyCustomParameterizedType<Double> dummyValue = new 
DummyCustomParameterizedType<Double>();
+                       dummyValue.setIntField(edge.getValue().intValue());
+                       dummyValue.setTField(new Double(edge.getValue()));      
                                        
+                       return dummyValue;
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapVerticesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapVerticesITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapVerticesITCase.java
new file mode 100644
index 0000000..4e2c858
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapVerticesITCase.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class MapVerticesITCase extends MultipleProgramsTestBase {
+
+       public MapVerticesITCase(MultipleProgramsTestBase.ExecutionMode mode){
+               super(mode);
+       }
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expectedResult, resultPath);
+       }
+
+       @Test
+       public void testWithSameValue() throws Exception {
+               /*
+                * Test mapVertices() keeping the same value type
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               
+               DataSet<Vertex<Long, Long>> mappedVertices = 
graph.mapVertices(new AddOneMapper()).getVertices();
+               
+               mappedVertices.writeAsCsv(resultPath);
+               env.execute();
+               expectedResult = "1,2\n" +
+                       "2,3\n" +
+                       "3,4\n" +
+                       "4,5\n" +
+                       "5,6\n";
+       }
+
+       @Test
+       public void testWithStringValue() throws Exception {
+               /*
+                * Test mapVertices() and change the value type to String
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               
+               DataSet<Vertex<Long, String>> mappedVertices = 
graph.mapVertices(new ToStringMapper()).getVertices();
+               
+               mappedVertices.writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,one\n" +
+                       "2,two\n" +
+                       "3,three\n" +
+                       "4,four\n" +
+                       "5,five\n";
+       }
+
+       @Test
+       public void testWithtuple1Value() throws Exception {
+               /*
+                * Test mapVertices() and change the value type to a Tuple1
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               
+               DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = 
graph.mapVertices(new ToTuple1Mapper()).getVertices();
+               
+               mappedVertices.writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,(1)\n" +
+                       "2,(2)\n" +
+                       "3,(3)\n" +
+                       "4,(4)\n" +
+                       "5,(5)\n";
+       }
+
+       @Test
+       public void testWithCustomType() throws Exception {
+               /*
+                * Test mapVertices() and change the value type to a custom type
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               
+               DataSet<Vertex<Long, DummyCustomType>> mappedVertices = 
graph.mapVertices(new ToCustomTypeMapper()).getVertices();
+               
+               mappedVertices.writeAsCsv(resultPath);
+               env.execute();
+
+               expectedResult = "1,(T,1)\n" +
+                       "2,(T,2)\n" +
+                       "3,(T,3)\n" +
+                       "4,(T,4)\n" +
+                       "5,(T,5)\n";
+       }
+
+       @Test
+       public void testWithCustomParametrizedType() throws Exception {
+               /*
+                * Test mapVertices() and change the value type to a 
parameterized custom type
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                               TestGraphUtils.getLongLongEdgeData(env), env);
+               
+               DataSet<Vertex<Long, DummyCustomParameterizedType<Double>>> 
mappedVertices = graph.mapVertices(
+                               new 
ToCustomParametrizedTypeMapper()).getVertices();
+               
+               mappedVertices.writeAsCsv(resultPath);
+               env.execute();
+       
+               expectedResult = "1,(1.0,1)\n" +
+                       "2,(2.0,2)\n" +
+                       "3,(3.0,3)\n" +
+                       "4,(4.0,4)\n" +
+                       "5,(5.0,5)\n";
+       }
+
+       @SuppressWarnings("serial")
+       private static final class AddOneMapper implements 
MapFunction<Vertex<Long, Long>, Long> {
+               public Long map(Vertex<Long, Long> value) throws Exception {
+                       return value.getValue()+1;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class ToStringMapper implements 
MapFunction<Vertex<Long, Long>, String> {
+               public String map(Vertex<Long, Long> vertex) throws Exception {
+                       String stringValue;
+                       if (vertex.getValue() == 1) {
+                               stringValue = "one";
+                       }
+                       else if (vertex.getValue() == 2) {
+                               stringValue = "two";
+                       }
+                       else if (vertex.getValue() == 3) {
+                               stringValue = "three";
+                       }
+                       else if (vertex.getValue() == 4) {
+                               stringValue = "four";
+                       }
+                       else if (vertex.getValue() == 5) {
+                               stringValue = "five";
+                       }
+                       else {
+                               stringValue = "";
+                       }
+                       return stringValue;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class ToTuple1Mapper implements 
MapFunction<Vertex<Long, Long>, Tuple1<Long>> {
+               public Tuple1<Long> map(Vertex<Long, Long> vertex) throws 
Exception {
+                       Tuple1<Long> tupleValue = new Tuple1<Long>();
+                       tupleValue.setFields(vertex.getValue());
+                       return tupleValue;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class ToCustomTypeMapper implements 
MapFunction<Vertex<Long, Long>, DummyCustomType> {
+               public DummyCustomType map(Vertex<Long, Long> vertex) throws 
Exception {
+                       DummyCustomType dummyValue = new DummyCustomType();
+                       dummyValue.setIntField(vertex.getValue().intValue());   
                                        
+                       return dummyValue;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class ToCustomParametrizedTypeMapper implements 
MapFunction<Vertex<Long, Long>, 
+               DummyCustomParameterizedType<Double>> {
+               
+               public DummyCustomParameterizedType<Double> map(Vertex<Long, 
Long> vertex) throws Exception {
+                       DummyCustomParameterizedType<Double> dummyValue = new 
DummyCustomParameterizedType<Double>();
+                       dummyValue.setIntField(vertex.getValue().intValue());
+                       dummyValue.setTField(new Double(vertex.getValue()));    
                                        
+                       return dummyValue;
+               }
+       }
+}

Reply via email to