[FLINK-1201] [gelly] remove isUndirected flag; methods will always consider the 
graph as directed


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b98bebc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b98bebc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b98bebc

Branch: refs/heads/master
Commit: 2b98bebc28e13b9bd933884961ced757b653936f
Parents: 878118a
Author: vasia <vasilikikala...@gmail.com>
Authored: Mon Jan 19 16:18:08 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 11 10:46:15 2015 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java | 469 ++++++++-----------
 1 file changed, 196 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2b98bebc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 6561803..3caf13d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -72,230 +72,18 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
     private final ExecutionEnvironment context;
        private final DataSet<Vertex<K, VV>> vertices;
        private final DataSet<Edge<K, EV>> edges;
-       private boolean isUndirected;
 
        /**
-        * Creates a graph from two DataSets: vertices and edges
+        * Creates a graph from two datasets: vertices and edges and allow 
setting the undirected property
         *
         * @param vertices a DataSet of vertices.
         * @param edges a DataSet of vertices.
         * @param context the flink execution environment.
         */
-       private Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> 
edges, ExecutionEnvironment context) {
-
-               /** a graph is directed by default */
-               this(vertices, edges, context, false);
-       }
-
-       /**
-        * Creates a graph from two DataSets: vertices and edges and allow 
setting the undirected property
-        *
-        * @param vertices a DataSet of vertices.
-        * @param edges a DataSet of vertices.
-        * @param context the flink execution environment.
-        * @param undirected whether this is an undirected graph
-        */
-       private Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> 
edges, ExecutionEnvironment context,
-                       boolean undirected) {
+       public Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> 
edges, ExecutionEnvironment context) {
                this.vertices = vertices;
                this.edges = edges;
         this.context = context;
-               this.isUndirected = undirected;
-       }
-
-       /**
-        * Creates a graph from a Collection of vertices and a Collection of 
edges.
-        * @param vertices a Collection of vertices.
-        * @param edges a Collection of vertices.
-        * @param context the flink execution environment.
-        * @return the newly created graph.
-        */
-       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable>
-               Graph<K, VV, EV> fromCollection (Collection<Vertex<K,VV>> 
vertices,
-                                                                               
 Collection<Edge<K,EV>> edges,
-                                                                               
 ExecutionEnvironment context) {
-
-               return fromDataSet(context.fromCollection(vertices), 
context.fromCollection(edges), context);
-       }
-
-       /**
-        * Creates a graph from a Collection of edges, vertices are induced 
from the edges.
-        * Vertices are created automatically and their values are set to 
NullValue.
-        * @param edges a Collection of vertices.
-        * @param context the flink execution environment.
-        * @return the newly created graph.
-        */
-       public static <K extends Comparable<K> & Serializable, EV extends 
Serializable>
-               Graph<K, NullValue, EV> fromCollection (Collection<Edge<K,EV>> 
edges, ExecutionEnvironment context) {
-
-               return fromDataSet(context.fromCollection(edges), context);
-       }
-
-       /**
-        * Creates a graph from a Collection of edges, vertices are induced 
from the edges and
-        * vertex values are calculated by a mapper function.
-        * Vertices are created automatically and their values are set
-        * by applying the provided map function to the vertex ids.
-        * @param edges a Collection of vertices.
-        * @param mapper the mapper function.
-        * @param context the flink execution environment.
-        * @return the newly created graph.
-        */
-       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable>
-               Graph<K, VV, EV> fromCollection (Collection<Edge<K,EV>> edges,
-                                                                               
 final MapFunction<K, VV> mapper,
-                                                                               
 ExecutionEnvironment context) {
-
-               return fromDataSet(context.fromCollection(edges), mapper, 
context);
-       }
-
-       /**
-        * Creates a graph from a DataSet of vertices and a DataSet of edges.
-        * @param vertices a DataSet of vertices.
-        * @param edges a DataSet of vertices.
-        * @param context the flink execution environment.
-        * @return the newly created graph.
-        */
-       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable>
-               Graph<K, VV, EV> fromDataSet (DataSet<Vertex<K,VV>> vertices,
-                                                                         
DataSet<Edge<K,EV>> edges,
-                                                                         
ExecutionEnvironment context) {
-
-               return new Graph<K, VV, EV>(vertices, edges, context);
-       }
-
-       /**
-        * Creates a graph from a DataSet of edges, vertices are induced from 
the edges.
-        * Vertices are created automatically and their values are set to 
NullValue.
-        * @param edges a DataSet of vertices.
-        * @param context the flink execution environment.
-        * @return the newly created graph.
-        */
-       public static <K extends Comparable<K> & Serializable, EV extends 
Serializable>
-               Graph<K, NullValue, EV> fromDataSet (DataSet<Edge<K,EV>> edges,
-                                                                               
         ExecutionEnvironment context) {
-
-               DataSet<Vertex<K, NullValue>> vertices =
-                               edges.flatMap(new EmitSrcAndTarget<K, 
EV>()).distinct();
-
-               return new Graph<K, NullValue, EV>(vertices, edges, context);
-       }
-
-       private static final class EmitSrcAndTarget<K extends Comparable<K> & 
Serializable, EV extends Serializable>
-                       implements FlatMapFunction<Edge<K, EV>, Vertex<K, 
NullValue>> {
-
-               public void flatMap(Edge<K, EV> edge, Collector<Vertex<K, 
NullValue>> out) {
-                       out.collect(new Vertex<K, NullValue>(edge.f0, 
NullValue.getInstance()));
-                       out.collect(new Vertex<K, NullValue>(edge.f1, 
NullValue.getInstance()));
-               }
-       }
-
-       /**
-        * Creates a graph from a DataSet of edges, vertices are induced from 
the edges and
-        * vertex values are calculated by a mapper function.
-        * Vertices are created automatically and their values are set
-        * by applying the provided map function to the vertex ids.
-        * @param edges a DataSet of vertices.
-        * @param mapper the mapper function.
-        * @param context the flink execution environment.
-        * @return the newly created graph.
-        */
-       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable>
-               Graph<K, VV, EV> fromDataSet (DataSet<Edge<K,EV>> edges,
-                                                                         final 
MapFunction<K, VV> mapper,
-                                                                         
ExecutionEnvironment context) {
-
-               TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
edges.getType()).getTypeAt(0);
-
-               TypeInformation<VV> valueType = TypeExtractor
-                               .createTypeInfo(MapFunction.class, 
mapper.getClass(), 1, null, null);
-
-               @SuppressWarnings({ "unchecked", "rawtypes" })
-               TypeInformation<Vertex<K, VV>> returnType = 
(TypeInformation<Vertex<K, VV>>)
-                               new TupleTypeInfo(Vertex.class, keyType, 
valueType);
-
-               DataSet<Vertex<K, VV>> vertices =
-                               edges.flatMap(new EmitSrcAndTargetAsTuple1<K, 
EV>())
-                                    .distinct()
-                                    .map(new MapFunction<Tuple1<K>, Vertex<K, 
VV>>() {
-                                                public Vertex<K, VV> 
map(Tuple1<K> value) throws Exception {
-                                                        return new Vertex<K, 
VV>(value.f0, mapper.map(value.f0));
-                                                }
-                                        })
-                                    .returns(returnType);
-
-               return new Graph<K, VV, EV>(vertices, edges, context);
-       }
-
-       private static final class EmitSrcAndTargetAsTuple1<K extends 
Comparable<K> & Serializable,
-                       EV extends Serializable> implements 
FlatMapFunction<Edge<K, EV>, Tuple1<K>> {
-
-               public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) 
{
-                       out.collect(new Tuple1<K>(edge.f0));
-                       out.collect(new Tuple1<K>(edge.f1));
-               }
-       }
-
-       /**
-        * Creates a graph from a DataSet of Tuple objects for vertices and 
edges.
-        *
-        * Vertices with value are created from Tuple2,
-        * Edges with value are created from Tuple3.
-        *
-        * @param vertices a DataSet of vertices.
-        * @param edges a DataSet of vertices.
-        * @param context the flink execution environment.
-        * @return the newly created graph.
-        */
-       @SuppressWarnings({ "unchecked" })
-       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable>
-               Graph<K, VV, EV> fromTupleDataSet (DataSet<Tuple2<K, VV>> 
vertices,
-                                                                               
   DataSet<Tuple3<K, K, EV>> edges,
-                                                                               
   ExecutionEnvironment context) {
-
-               DataSet<Vertex<K, VV>> vertexDataSet = (DataSet<Vertex<K, VV>>) 
(DataSet<?>) vertices;
-               DataSet<Edge<K, EV>> edgeDataSet = (DataSet<Edge<K, EV>>) 
(DataSet<?>) edges;
-               return fromDataSet(vertexDataSet, edgeDataSet, context);
-       }
-
-       /**
-        * Creates a graph from a DataSet of Tuple objects for edges, vertices 
are induced from the edges.
-        *
-        * Edges with value are created from Tuple3.
-        * Vertices are created automatically and their values are set to 
NullValue.
-        *
-        * @param edges a DataSet of vertices.
-        * @param context the flink execution environment.
-        * @return the newly created graph.
-        */
-       @SuppressWarnings({ "unchecked" })
-       public static <K extends Comparable<K> & Serializable, EV extends 
Serializable>
-               Graph<K, NullValue, EV> fromTupleDataSet (DataSet<Tuple3<K, K, 
EV>> edges,
-                                                                               
                  ExecutionEnvironment context) {
-
-               DataSet<Edge<K, EV>> edgeDataSet = (DataSet<Edge<K, EV>>) 
(DataSet<?>) edges;
-               return fromDataSet(edgeDataSet, context);
-       }
-
-       /**
-        * Creates a graph from a DataSet of Tuple objects for edges, vertices 
are induced from the edges and
-        * vertex values are calculated by a mapper function.
-        * Edges with value are created from Tuple3.
-        * Vertices are created automatically and their values are set
-        * by applying the provided map function to the vertex ids.
-        * @param edges a DataSet of vertices.
-        * @param mapper the mapper function.
-        * @param context the flink execution environment.
-        * @return the newly created graph.
-        */
-       @SuppressWarnings({ "unchecked" })
-       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable>
-       Graph<K, VV, EV> fromTupleDataSet (DataSet<Tuple3<K, K, EV>> edges,
-                                                                          
final MapFunction<K, VV> mapper,
-                                                                          
ExecutionEnvironment context) {
-
-               DataSet<Edge<K, EV>> edgeDataSet = (DataSet<Edge<K, EV>>) 
(DataSet<?>) edges;
-               return fromDataSet(edgeDataSet, mapper, context);
        }
 
        /**
@@ -314,35 +102,19 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        }
 
        /**
-        * @return the vertex DataSet.
+        * @return the vertex dataset.
         */
        public DataSet<Vertex<K, VV>> getVertices() {
                return vertices;
        }
 
        /**
-        * @return the edge DataSet.
+        * @return the edge dataset.
         */
        public DataSet<Edge<K, EV>> getEdges() {
                return edges;
        }
 
-       /**
-        * @return the vertex DataSet as Tuple2.
-        */
-       @SuppressWarnings({ "unchecked" })
-       public DataSet<Tuple2<K, VV>> getVerticesAsTuple2() {
-               return (DataSet<Tuple2<K, VV>>) (DataSet<?>) vertices;
-       }
-
-       /**
-        * @return the edge DataSet as Tuple3.
-        */
-       @SuppressWarnings({ "unchecked" })
-       public DataSet<Tuple3<K, K, EV>> getEdgesAsTuple3() {
-               return (DataSet<Tuple3<K, K, EV>>) (DataSet<?>) edges;
-       }
-
     /**
      * Apply a function to the attribute of each vertex in the graph.
      * @param mapper the map function to apply.
@@ -367,7 +139,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
                                })
                        .returns(returnType);
 
-        return new Graph<K, NV, EV>(mappedVertices, this.edges, this.context);
+        return new Graph<K, NV, EV>(mappedVertices, this.getEdges(), 
this.context);
     }
 
     /**
@@ -408,7 +180,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
                DataSet<Vertex<K, VV>> resultedVertices = this.getVertices()
                                .coGroup(inputDataSet).where(0).equalTo(0)
                                .with(new ApplyCoGroupToVertexValues<K, VV, 
T>(mapper));
-               return new Graph(resultedVertices, this.edges, this.context);
+               return Graph.create(resultedVertices, this.getEdges(), 
this.getContext());
        }
 
        private static final class ApplyCoGroupToVertexValues<K extends 
Comparable<K> & Serializable,
@@ -454,7 +226,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
                DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
                                .coGroup(inputDataSet).where(0,1).equalTo(0,1)
                                .with(new ApplyCoGroupToEdgeValues<K, EV, 
T>(mapper));
-               return new Graph(this.vertices, resultedEdges, this.context);
+               return Graph.create(this.getVertices(), resultedEdges, 
this.getContext());
        }
 
        private static final class ApplyCoGroupToEdgeValues<K extends 
Comparable<K> & Serializable,
@@ -503,7 +275,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
                                .coGroup(inputDataSet).where(0).equalTo(0)
                                .with(new 
ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper));
 
-               return new Graph(this.vertices, resultedEdges, this.context);
+               return Graph.create(this.getVertices(), resultedEdges, 
this.getContext());
        }
 
        private static final class 
ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K extends Comparable<K> & 
Serializable,
@@ -555,7 +327,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
                                .coGroup(inputDataSet).where(1).equalTo(0)
                                .with(new 
ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper));
 
-               return new Graph(this.vertices, resultedEdges, this.context);
+               return Graph.create(this.getVertices(), resultedEdges, 
this.getContext());
        }
 
        /**
@@ -667,19 +439,14 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        }
 
        /**
-        * Convert the directed graph into an undirected graph
-        * by adding all inverse-direction edges.
+        * This operation adds all inverse-direction edges
+        * to the graph.
         * @return the undirected graph.
         */
        public Graph<K, VV, EV> getUndirected() throws 
UnsupportedOperationException {
-               if (this.isUndirected) {
-                       throw new UnsupportedOperationException("The graph is 
already undirected.");
-               }
-               else {
                        DataSet<Edge<K, EV>> undirectedEdges =
                                        edges.union(edges.map(new 
ReverseEdgesMap<K, EV>()));
-                       return new Graph<K, VV, EV>(vertices, undirectedEdges, 
this.context, true);
-                       }
+                       return new Graph<K, VV, EV>(vertices, undirectedEdges, 
this.context);
        }
        
        /**
@@ -878,13 +645,122 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
         * @throws UnsupportedOperationException
         */
        public Graph<K, VV, EV> reverse() throws UnsupportedOperationException {
-               if (this.isUndirected) {
-                       throw new UnsupportedOperationException("The graph is 
already undirected.");
-               }
-               else {
-                       DataSet<Edge<K, EV>> undirectedEdges = edges.map(new 
ReverseEdgesMap<K, EV>());
-                       return new Graph<K, VV, EV>(this.vertices, 
undirectedEdges, this.context, true);
-               }
+               DataSet<Edge<K, EV>> reversedEdges = edges.map(new 
ReverseEdgesMap<K, EV>());
+               return new Graph<K, VV, EV>(vertices, reversedEdges, 
this.context);
+       }
+
+       /**
+        * Creates a graph from a dataset of vertices and a dataset of edges
+        * @param vertices a DataSet of vertices.
+        * @param edges a DataSet of vertices.
+        * @param context the flink execution environment.
+        * @return the newly created graph
+        */
+       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable,
+               EV extends Serializable> Graph<K, VV, EV>
+               create(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> 
edges, 
+                               ExecutionEnvironment context) {
+               return new Graph<K, VV, EV>(vertices, edges, context);
+       }
+       
+       /**
+        * Creates a graph from a DataSet of edges.
+        * Vertices are created automatically and their values are set to 
NullValue.
+        * @param edges a DataSet of vertices.
+        * @param context the flink execution environment.
+        * @return the newly created graph
+        */
+       public static <K extends Comparable<K> & Serializable, EV extends 
Serializable> 
+               Graph<K, NullValue, EV> create(DataSet<Edge<K, EV>> edges, 
ExecutionEnvironment context) {
+               DataSet<Vertex<K, NullValue>> vertices = 
+                               edges.flatMap(new EmitSrcAndTarget<K, 
EV>()).distinct(); 
+               return new Graph<K, NullValue, EV>(vertices, edges, context);
+       }
+       
+       /**
+        * Creates a graph from a DataSet of edges.
+        * Vertices are created automatically and their values are set
+        * by applying the provided map function to the vertex ids.
+        * @param edges the input edges
+        * @param mapper the map function to set the initial vertex value
+        * @return the newly created graph
+        */
+       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable> 
+               Graph<K, VV, EV> create(DataSet<Edge<K, EV>> edges, final 
MapFunction<K, VV> mapper, 
+                               ExecutionEnvironment context) {
+               TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
edges.getType()).getTypeAt(0);
+
+               TypeInformation<VV> valueType = TypeExtractor
+                               .createTypeInfo(MapFunction.class, 
mapper.getClass(), 1, null, null);
+
+               @SuppressWarnings({ "unchecked", "rawtypes" })
+               TypeInformation<Vertex<K, VV>> returnType = 
(TypeInformation<Vertex<K, VV>>)
+                               new TupleTypeInfo(Vertex.class, keyType, 
valueType);
+
+               DataSet<Vertex<K, VV>> vertices = 
+                               edges.flatMap(new EmitSrcAndTargetAsTuple1<K, 
EV>())
+                               .distinct().map(new MapFunction<Tuple1<K>, 
Vertex<K, VV>>(){
+                                       public Vertex<K, VV> map(Tuple1<K> 
value) throws Exception {
+                                               return new Vertex<K, 
VV>(value.f0, mapper.map(value.f0));
+                                       }
+                               }).returns(returnType);
+               return new Graph<K, VV, EV>(vertices, edges, context);
+       }
+
+       private static final class EmitSrcAndTarget<K extends Comparable<K> & 
Serializable, EV extends Serializable>
+               implements FlatMapFunction<Edge<K, EV>, Vertex<K, NullValue>> {
+               public void flatMap(Edge<K, EV> edge,
+                               Collector<Vertex<K, NullValue>> out) {
+
+                               out.collect(new Vertex<K, NullValue>(edge.f0, 
NullValue.getInstance()));
+                               out.collect(new Vertex<K, NullValue>(edge.f1, 
NullValue.getInstance()));
+               }       
+       }
+
+       private static final class EmitSrcAndTargetAsTuple1<K extends 
Comparable<K> & Serializable, 
+               EV extends Serializable> implements FlatMapFunction<Edge<K, 
EV>, Tuple1<K>> {
+               public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) 
{
+
+                       out.collect(new Tuple1<K>(edge.f0));
+                       out.collect(new Tuple1<K>(edge.f1));
+               }       
+       }
+
+       /**
+        * Read and create the graph vertex Tuple2 DataSet from a csv file
+        *
+        * The CSV file should be of the following format:
+        *
+        * <vertexID><delimiter><vertexValue>
+        *
+        * For example, with space delimiter:
+        *
+        * 1 57
+        * 2 45
+        * 3 77
+        * 4 12
+        *
+        * @param context the flink execution environment.
+        * @param filePath the path to the CSV file.
+        * @param delimiter the CSV delimiter.
+        * @param Tuple2IdClass The class to use for Vertex IDs
+        * @param Tuple2ValueClass The class to use for Vertex Values
+        * @return a set of vertices and their values.
+        */
+       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable>
+               DataSet<Tuple2<K, VV>>
+               readTuple2CsvFile(ExecutionEnvironment context, String filePath,
+                       char delimiter, Class<K> Tuple2IdClass, Class<VV> 
Tuple2ValueClass) {
+
+               CsvReader reader = new CsvReader(filePath, context);
+               DataSet<Tuple2<K, VV>> vertices = 
reader.fieldDelimiter(delimiter).types(Tuple2IdClass, Tuple2ValueClass)
+               .map(new MapFunction<Tuple2<K, VV>, Tuple2<K, VV>>() {
+
+                       public Tuple2<K, VV> map(Tuple2<K, VV> value) throws 
Exception {
+                               return (Tuple2<K, VV>)value;
+                       }
+               });
+               return vertices;
        }
 
        /**
@@ -937,17 +813,10 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
         * @return true if the graph is weakly connected.
         */
        public DataSet<Boolean> isWeaklyConnected (int maxIterations) {
-               Graph<K, VV, EV> graph;
-               
-               if (!(this.isUndirected)) {
-                       // first, convert to an undirected graph
-                       graph = this.getUndirected();
-               }
-               else {
-                       graph = this;
-               }
+               // first, convert to an undirected graph
+               Graph<K, VV, EV> graph = this.getUndirected();
 
-        DataSet<K> vertexIds = graph.getVertexIds();
+               DataSet<K> vertexIds = graph.getVertexIds();
         DataSet<Tuple2<K,K>> verticesWithInitialIds = vertexIds
                 .map(new DuplicateVertexIDMapper<K>());
 
@@ -1011,6 +880,14 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
         }
        }
        
+    public Graph<K, VV, EV> fromCollection (Collection<Vertex<K,VV>> vertices, 
Collection<Edge<K,EV>> edges) {
+
+               DataSet<Vertex<K, VV>> v = context.fromCollection(vertices);
+               DataSet<Edge<K, EV>> e = context.fromCollection(edges);
+
+               return new Graph<K, VV, EV>(v, e, context);
+       }
+
        /**
         * Adds the input vertex and edges to the graph.
         * If the vertex already exists in the graph, it will not be added 
again,
@@ -1025,14 +902,14 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
 
        // Take care of empty edge set
        if (edges.isEmpty()) {
-               return new Graph(this.vertices.union(newVertex).distinct(), 
this.edges, this.context);
+               return Graph.create(getVertices().union(newVertex).distinct(), 
getEdges(), context);
        }
 
        // Add the vertex and its edges
-       DataSet<Vertex<K, VV>> newVertices = 
this.vertices.union(newVertex).distinct();
-       DataSet<Edge<K, EV>> newEdges = 
this.edges.union(context.fromCollection(edges));
+       DataSet<Vertex<K, VV>> newVertices = 
getVertices().union(newVertex).distinct();
+       DataSet<Edge<K, EV>> newEdges = 
getEdges().union(context.fromCollection(edges));
 
-       return new Graph(newVertices, newEdges, this.context);
+       return Graph.create(newVertices, newEdges, context);
     }
 
     /**
@@ -1045,11 +922,8 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
      * @return the new graph containing the existing vertices and edges plus 
the newly added edge
      */
     public Graph<K, VV, EV> addEdge (Vertex<K,VV> source, Vertex<K,VV> target, 
EV edgeValue) {
-       Graph<K,VV,EV> partialGraph = fromCollection(
-                               Arrays.asList(source, target),
-                               Arrays.asList(new Edge<K, EV>(source.f0, 
target.f0, edgeValue)),
-                               this.context
-               );
+       Graph<K,VV,EV> partialGraph = this.fromCollection(Arrays.asList(source, 
target),
+                               Arrays.asList(new Edge<K, EV>(source.f0, 
target.f0, edgeValue)));
         return this.union(partialGraph);
     }
 
@@ -1112,7 +986,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
     public Graph<K, VV, EV> removeEdge (Edge<K, EV> edge) {
                DataSet<Edge<K, EV>> newEdges = getEdges().filter(
                                new EdgeRemovalEdgeFilter<K, EV>(edge));
-        return new Graph<K, VV, EV>(this.vertices, newEdges, this.context);
+        return new Graph<K, VV, EV>(this.getVertices(), newEdges, 
this.context);
     }
     
     private static final class EdgeRemovalEdgeFilter<K extends Comparable<K> & 
Serializable, 
@@ -1154,9 +1028,58 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(
                             VertexCentricIteration.withEdges(edges,
                                                vertexUpdateFunction, 
messagingFunction, maximumNumberOfIterations));
-               return new Graph<K, VV, EV>(newVertices, this.edges, 
this.context);
+               return new Graph<K, VV, EV>(newVertices, edges, context);
     }
 
+       /**
+        * Creates a graph from the given vertex and edge collections
+        * @param context the flink execution environment.
+        * @param v the collection of vertices
+        * @param e the collection of edges
+        * @return a new graph formed from the set of edges and vertices
+        */
+       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable,
+                       EV extends Serializable> Graph<K, VV, EV>
+               fromCollection(ExecutionEnvironment context, 
Collection<Vertex<K, VV>> v,
+                                          Collection<Edge<K, EV>> e) throws 
Exception {
+
+               DataSet<Vertex<K, VV>> vertices = context.fromCollection(v);
+               DataSet<Edge<K, EV>> edges = context.fromCollection(e);
+
+               return Graph.create(vertices, edges, context);
+       }
+
+       /**
+        * Vertices may not have a value attached or may receive a value as a 
result of running the algorithm.
+        * @param context the flink execution environment.
+        * @param e the collection of edges
+        * @return a new graph formed from the edges, with no value for the 
vertices
+        */
+       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable,
+                       EV extends Serializable> Graph<K, NullValue, EV>
+               fromCollection(ExecutionEnvironment context, Collection<Edge<K, 
EV>> e) {
+
+               DataSet<Edge<K, EV>> edges = context.fromCollection(e);
+
+               return Graph.create(edges, context);
+       }
+
+       /**
+        * Vertices may have an initial value defined by a function.
+        * @param context the flink execution environment.
+        * @param e the collection of edges
+        * @return a new graph formed from the edges, with a custom value for 
the vertices,
+        * determined by the mapping function
+        */
+       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable,
+                       EV extends Serializable> Graph<K, VV, EV>
+               fromCollection(ExecutionEnvironment context, Collection<Edge<K, 
EV>> e,
+                                          final MapFunction<K, VV> mapper) {
+
+               DataSet<Edge<K, EV>> edges = context.fromCollection(e);
+               return Graph.create(edges, mapper, context);
+       }
+
        public Graph<K, VV, EV> run (GraphAlgorithm<K, VV, EV> algorithm) {
                return algorithm.run(this);
        }

Reply via email to