http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
deleted file mode 100644
index c815ca4..0000000
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.graph;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.examples.java.graph.util.EnumTrianglesData;
-import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge;
-import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Triad;
-
-/**
- * Triangle enumeration is a pre-processing step to find closely connected 
parts in graphs.
- * A triangle consists of three edges that connect three vertices with each 
other.
- * 
- * <p>
- * The algorithm works as follows: 
- * It groups all edges that share a common vertex and builds triads, i.e., 
triples of vertices 
- * that are connected by two edges. Finally, all triads are filtered for which 
no third edge exists 
- * that closes the triangle.
- *  
- * <p>
- * Input files are plain text files and must be formatted as follows:
- * <ul>
- * <li>Edges are represented as pairs for vertex IDs which are separated by 
space 
- * characters. Edges are separated by new-line characters.<br>
- * For example <code>"1 2\n2 12\n1 12\n42 63"</code> gives four (undirected) 
edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63)
- * that include a triangle
- * </ul>
- * <pre>
- *     (1)
- *     /  \
- *   (2)-(12)
- * </pre>
- * 
- * Usage: <code>EnumTriangleBasic &lt;edge path&gt; &lt;result 
path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from 
{@link EnumTrianglesData}. 
- * 
- * <p>
- * This example shows how to use:
- * <ul>
- * <li>Custom Java objects which extend Tuple
- * <li>Group Sorting
- * </ul>
- * 
- */
-@SuppressWarnings("serial")
-public class EnumTrianglesBasic {
-
-       static boolean fileOutput = false;
-       static String edgePath = null;
-       static String outputPath = null;
-       
-       // 
*************************************************************************
-       //     PROGRAM
-       // 
*************************************************************************
-       
-       public static void main(String[] args) throws Exception {
-               
-               if(!parseParameters(args)) {
-                       return;
-               }
-               
-               // set up execution environment
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-       
-               // read input data
-               DataSet<Edge> edges = getEdgeDataSet(env);
-               
-               // project edges by vertex id
-               DataSet<Edge> edgesById = edges
-                               .map(new EdgeByIdProjector());
-               
-               DataSet<Triad> triangles = edgesById
-                               // build triads
-                               .groupBy(Edge.V1).sortGroup(Edge.V2, 
Order.ASCENDING).reduceGroup(new TriadBuilder())
-                               // filter triads
-                               .join(edgesById).where(Triad.V2, 
Triad.V3).equalTo(Edge.V1, Edge.V2).with(new TriadFilter());
-
-               // emit result
-               if (fileOutput) {
-                       triangles.writeAsCsv(outputPath, "\n", ",");
-                       // execute program
-                       env.execute("Basic Triangle Enumeration Example");
-               } else {
-                       triangles.print();
-               }
-       }
-       
-       // 
*************************************************************************
-       //     USER FUNCTIONS
-       // 
*************************************************************************
-
-       /** Converts a Tuple2 into an Edge */
-       @ForwardedFields("0;1")
-       public static class TupleEdgeConverter implements 
MapFunction<Tuple2<Integer, Integer>, Edge> {
-               private final Edge outEdge = new Edge();
-               
-               @Override
-               public Edge map(Tuple2<Integer, Integer> t) throws Exception {
-                       outEdge.copyVerticesFromTuple2(t);
-                       return outEdge;
-               }
-       }
-       
-       /** Projects an edge (pair of vertices) such that the id of the first 
is smaller than the id of the second. */
-       private static class EdgeByIdProjector implements MapFunction<Edge, 
Edge> {
-       
-               @Override
-               public Edge map(Edge inEdge) throws Exception {
-                       
-                       // flip vertices if necessary
-                       if(inEdge.getFirstVertex() > inEdge.getSecondVertex()) {
-                               inEdge.flipVertices();
-                       }
-                       
-                       return inEdge;
-               }
-       }
-       
-       /**
-        *  Builds triads (triples of vertices) from pairs of edges that share 
a vertex.
-        *  The first vertex of a triad is the shared vertex, the second and 
third vertex are ordered by vertexId. 
-        *  Assumes that input edges share the first vertex and are in 
ascending order of the second vertex.
-        */
-       @ForwardedFields("0")
-       private static class TriadBuilder implements GroupReduceFunction<Edge, 
Triad> {
-               private final List<Integer> vertices = new ArrayList<Integer>();
-               private final Triad outTriad = new Triad();
-               
-               @Override
-               public void reduce(Iterable<Edge> edgesIter, Collector<Triad> 
out) throws Exception {
-                       
-                       final Iterator<Edge> edges = edgesIter.iterator();
-                       
-                       // clear vertex list
-                       vertices.clear();
-                       
-                       // read first edge
-                       Edge firstEdge = edges.next();
-                       outTriad.setFirstVertex(firstEdge.getFirstVertex());
-                       vertices.add(firstEdge.getSecondVertex());
-                       
-                       // build and emit triads
-                       while (edges.hasNext()) {
-                               Integer higherVertexId = 
edges.next().getSecondVertex();
-                               
-                               // combine vertex with all previously read 
vertices
-                               for (Integer lowerVertexId : vertices) {
-                                       outTriad.setSecondVertex(lowerVertexId);
-                                       outTriad.setThirdVertex(higherVertexId);
-                                       out.collect(outTriad);
-                               }
-                               vertices.add(higherVertexId);
-                       }
-               }
-       }
-       
-       /** Filters triads (three vertices connected by two edges) without a 
closing third edge. */
-       private static class TriadFilter implements JoinFunction<Triad, Edge, 
Triad> {
-               
-               @Override
-               public Triad join(Triad triad, Edge edge) throws Exception {
-                       return triad;
-               }
-       }
-       
-       // 
*************************************************************************
-       //     UTIL METHODS
-       // 
*************************************************************************
-       
-       private static boolean parseParameters(String[] args) {
-       
-               if(args.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if(args.length == 2) {
-                               edgePath = args[0];
-                               outputPath = args[1];
-                       } else {
-                               System.err.println("Usage: EnumTriangleBasic 
<edge path> <result path>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing Enum Triangles Basic 
example with built-in default data.");
-                       System.out.println("  Provide parameters to read input 
data from files.");
-                       System.out.println("  See the documentation for the 
correct format of input files.");
-                       System.out.println("  Usage: EnumTriangleBasic <edge 
path> <result path>");
-               }
-               return true;
-       }
-       
-       private static DataSet<Edge> getEdgeDataSet(ExecutionEnvironment env) {
-               if(fileOutput) {
-                       return env.readCsvFile(edgePath)
-                                               .fieldDelimiter(" ")
-                                               .includeFields(true, true)
-                                               .types(Integer.class, 
Integer.class)
-                                               .map(new TupleEdgeConverter());
-               } else {
-                       return EnumTrianglesData.getDefaultEdgeDataSet(env);
-               }
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
deleted file mode 100644
index 3937161..0000000
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.graph;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.examples.java.graph.util.EnumTrianglesData;
-import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge;
-import 
org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.EdgeWithDegrees;
-import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Triad;
-
-/**
- * Triangle enumeration is a pre-processing step to find closely connected 
parts in graphs.
- * A triangle consists of three edges that connect three vertices with each 
other.
- * 
- * <p>
- * The basic algorithm works as follows: 
- * It groups all edges that share a common vertex and builds triads, i.e., 
triples of vertices 
- * that are connected by two edges. Finally, all triads are filtered for which 
no third edge exists 
- * that closes the triangle.
- * 
- * <p>
- * For a group of <i>n</i> edges that share a common vertex, the number of 
built triads is quadratic <i>((n*(n-1))/2)</i>.
- * Therefore, an optimization of the algorithm is to group edges on the vertex 
with the smaller output degree to 
- * reduce the number of triads. 
- * This implementation extends the basic algorithm by computing output degrees 
of edge vertices and 
- * grouping on edges on the vertex with the smaller degree.
- * 
- * <p>
- * Input files are plain text files and must be formatted as follows:
- * <ul>
- * <li>Edges are represented as pairs for vertex IDs which are separated by 
space 
- * characters. Edges are separated by new-line characters.<br>
- * For example <code>"1 2\n2 12\n1 12\n42 63"</code> gives four (undirected) 
edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63)
- * that include a triangle
- * </ul>
- * <pre>
- *     (1)
- *     /  \
- *   (2)-(12)
- * </pre>
- * 
- * Usage: <code>EnumTriangleOpt &lt;edge path&gt; &lt;result 
path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from 
{@link EnumTrianglesData}.
- * 
- * <p>
- * This example shows how to use:
- * <ul>
- * <li>Custom Java objects which extend Tuple
- * <li>Group Sorting
- * </ul>
- * 
- */
-@SuppressWarnings("serial")
-public class EnumTrianglesOpt {
-
-       // 
*************************************************************************
-       //     PROGRAM
-       // 
*************************************************************************
-       
-       public static void main(String[] args) throws Exception {
-               
-               if(!parseParameters(args)) {
-                       return;
-               }
-               
-               // set up execution environment
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               // read input data
-               DataSet<Edge> edges = getEdgeDataSet(env);
-               
-               // annotate edges with degrees
-               DataSet<EdgeWithDegrees> edgesWithDegrees = edges
-                               .flatMap(new EdgeDuplicator())
-                               .groupBy(Edge.V1).sortGroup(Edge.V2, 
Order.ASCENDING).reduceGroup(new DegreeCounter())
-                               
.groupBy(EdgeWithDegrees.V1,EdgeWithDegrees.V2).reduce(new DegreeJoiner());
-               
-               // project edges by degrees
-               DataSet<Edge> edgesByDegree = edgesWithDegrees
-                               .map(new EdgeByDegreeProjector());
-               // project edges by vertex id
-               DataSet<Edge> edgesById = edgesByDegree
-                               .map(new EdgeByIdProjector());
-               
-               DataSet<Triad> triangles = edgesByDegree
-                               // build triads
-                               .groupBy(Edge.V1).sortGroup(Edge.V2, 
Order.ASCENDING).reduceGroup(new TriadBuilder())
-                               // filter triads
-                               
.join(edgesById).where(Triad.V2,Triad.V3).equalTo(Edge.V1,Edge.V2).with(new 
TriadFilter());
-
-               // emit result
-               if(fileOutput) {
-                       triangles.writeAsCsv(outputPath, "\n", ",");
-                       // execute program
-                       env.execute("Triangle Enumeration Example");
-               } else {
-                       triangles.print();
-               }
-
-               
-       }
-       
-       // 
*************************************************************************
-       //     USER FUNCTIONS
-       // 
*************************************************************************
-       
-       /** Converts a Tuple2 into an Edge */
-       @ForwardedFields("0;1")
-       public static class TupleEdgeConverter implements 
MapFunction<Tuple2<Integer, Integer>, Edge> {
-               private final Edge outEdge = new Edge();
-               
-               @Override
-               public Edge map(Tuple2<Integer, Integer> t) throws Exception {
-                       outEdge.copyVerticesFromTuple2(t);
-                       return outEdge;
-               }
-       }
-       
-       /** Emits for an edge the original edge and its switched version. */
-       private static class EdgeDuplicator implements FlatMapFunction<Edge, 
Edge> {
-               
-               @Override
-               public void flatMap(Edge edge, Collector<Edge> out) throws 
Exception {
-                       out.collect(edge);
-                       edge.flipVertices();
-                       out.collect(edge);
-               }
-       }
-       
-       /**
-        * Counts the number of edges that share a common vertex.
-        * Emits one edge for each input edge with a degree annotation for the 
shared vertex.
-        * For each emitted edge, the first vertex is the vertex with the 
smaller id.
-        */
-       private static class DegreeCounter implements GroupReduceFunction<Edge, 
EdgeWithDegrees> {
-               
-               final ArrayList<Integer> otherVertices = new 
ArrayList<Integer>();
-               final EdgeWithDegrees outputEdge = new EdgeWithDegrees();
-               
-               @Override
-               public void reduce(Iterable<Edge> edgesIter, 
Collector<EdgeWithDegrees> out) {
-                       
-                       Iterator<Edge> edges = edgesIter.iterator();
-                       otherVertices.clear();
-                       
-                       // get first edge
-                       Edge edge = edges.next();
-                       Integer groupVertex = edge.getFirstVertex();
-                       this.otherVertices.add(edge.getSecondVertex());
-                       
-                       // get all other edges (assumes edges are sorted by 
second vertex)
-                       while (edges.hasNext()) {
-                               edge = edges.next();
-                               Integer otherVertex = edge.getSecondVertex();
-                               // collect unique vertices
-                               if(!otherVertices.contains(otherVertex) && 
otherVertex != groupVertex) {
-                                       this.otherVertices.add(otherVertex);
-                               }
-                       }
-                       int degree = this.otherVertices.size();
-                       
-                       // emit edges
-                       for(Integer otherVertex : this.otherVertices) {
-                               if(groupVertex < otherVertex) {
-                                       outputEdge.setFirstVertex(groupVertex);
-                                       outputEdge.setFirstDegree(degree);
-                                       outputEdge.setSecondVertex(otherVertex);
-                                       outputEdge.setSecondDegree(0);
-                               } else {
-                                       outputEdge.setFirstVertex(otherVertex);
-                                       outputEdge.setFirstDegree(0);
-                                       outputEdge.setSecondVertex(groupVertex);
-                                       outputEdge.setSecondDegree(degree);
-                               }
-                               out.collect(outputEdge);
-                       }
-               }
-       }
-       
-       /**
-        * Builds an edge with degree annotation from two edges that have the 
same vertices and only one 
-        * degree annotation.
-        */
-       @ForwardedFields("0;1")
-       private static class DegreeJoiner implements 
ReduceFunction<EdgeWithDegrees> {
-               private final EdgeWithDegrees outEdge = new EdgeWithDegrees();
-               
-               @Override
-               public EdgeWithDegrees reduce(EdgeWithDegrees edge1, 
EdgeWithDegrees edge2) throws Exception {
-                       
-                       // copy first edge
-                       outEdge.copyFrom(edge1);
-                       
-                       // set missing degree
-                       if(edge1.getFirstDegree() == 0 && 
edge1.getSecondDegree() != 0) {
-                               outEdge.setFirstDegree(edge2.getFirstDegree());
-                       } else if (edge1.getFirstDegree() != 0 && 
edge1.getSecondDegree() == 0) {
-                               
outEdge.setSecondDegree(edge2.getSecondDegree());
-                       }
-                       return outEdge;
-               }
-       }
-               
-       /** Projects an edge (pair of vertices) such that the first vertex is 
the vertex with the smaller degree. */
-       private static class EdgeByDegreeProjector implements 
MapFunction<EdgeWithDegrees, Edge> {
-               
-               private final Edge outEdge = new Edge();
-               
-               @Override
-               public Edge map(EdgeWithDegrees inEdge) throws Exception {
-
-                       // copy vertices to simple edge
-                       outEdge.copyVerticesFromEdgeWithDegrees(inEdge);
-
-                       // flip vertices if first degree is larger than second 
degree.
-                       if(inEdge.getFirstDegree() > inEdge.getSecondDegree()) {
-                               outEdge.flipVertices();
-                       }
-
-                       // return edge
-                       return outEdge;
-               }
-       }
-       
-       /** Projects an edge (pair of vertices) such that the id of the first 
is smaller than the id of the second. */
-       private static class EdgeByIdProjector implements MapFunction<Edge, 
Edge> {
-       
-               @Override
-               public Edge map(Edge inEdge) throws Exception {
-                       
-                       // flip vertices if necessary
-                       if(inEdge.getFirstVertex() > inEdge.getSecondVertex()) {
-                               inEdge.flipVertices();
-                       }
-                       
-                       return inEdge;
-               }
-       }
-       
-       /**
-        *  Builds triads (triples of vertices) from pairs of edges that share 
a vertex.
-        *  The first vertex of a triad is the shared vertex, the second and 
third vertex are ordered by vertexId. 
-        *  Assumes that input edges share the first vertex and are in 
ascending order of the second vertex.
-        */
-       @ForwardedFields("0")
-       private static class TriadBuilder implements GroupReduceFunction<Edge, 
Triad> {
-               
-               private final List<Integer> vertices = new ArrayList<Integer>();
-               private final Triad outTriad = new Triad();
-               
-               @Override
-               public void reduce(Iterable<Edge> edgesIter, Collector<Triad> 
out) throws Exception {
-                       final Iterator<Edge> edges = edgesIter.iterator();
-                       
-                       // clear vertex list
-                       vertices.clear();
-                       
-                       // read first edge
-                       Edge firstEdge = edges.next();
-                       outTriad.setFirstVertex(firstEdge.getFirstVertex());
-                       vertices.add(firstEdge.getSecondVertex());
-                       
-                       // build and emit triads
-                       while (edges.hasNext()) {
-                               Integer higherVertexId = 
edges.next().getSecondVertex();
-                               
-                               // combine vertex with all previously read 
vertices
-                               for(Integer lowerVertexId : vertices) {
-                                       outTriad.setSecondVertex(lowerVertexId);
-                                       outTriad.setThirdVertex(higherVertexId);
-                                       out.collect(outTriad);
-                               }
-                               vertices.add(higherVertexId);
-                       }
-               }
-       }
-       
-       /** Filters triads (three vertices connected by two edges) without a 
closing third edge. */
-       private static class TriadFilter implements JoinFunction<Triad, Edge, 
Triad> {
-               
-               @Override
-               public Triad join(Triad triad, Edge edge) throws Exception {
-                       return triad;
-               }
-       }
-       
-       // 
*************************************************************************
-       //     UTIL METHODS
-       // 
*************************************************************************
-       
-       private static boolean fileOutput = false;
-       private static String edgePath = null;
-       private static String outputPath = null;
-       
-       private static boolean parseParameters(String[] args) {
-               
-               if(args.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if(args.length == 2) {
-                               edgePath = args[0];
-                               outputPath = args[1];
-                       } else {
-                               System.err.println("Usage: EnumTriangleBasic 
<edge path> <result path>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing Enum Triangles Opt 
example with built-in default data.");
-                       System.out.println("  Provide parameters to read input 
data from files.");
-                       System.out.println("  See the documentation for the 
correct format of input files.");
-                       System.out.println("  Usage: EnumTriangleOpt <edge 
path> <result path>");
-               }
-               return true;
-       }
-       
-       private static DataSet<Edge> getEdgeDataSet(ExecutionEnvironment env) {
-               if(fileOutput) {
-                       return env.readCsvFile(edgePath)
-                                               .fieldDelimiter(" ")
-                                               .includeFields(true, true)
-                                               .types(Integer.class, 
Integer.class)
-                                               .map(new TupleEdgeConverter());
-               } else {
-                       return EnumTrianglesData.getDefaultEdgeDataSet(env);
-               }
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
deleted file mode 100644
index 7b05158..0000000
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.graph;
-
-import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
-
-import java.util.ArrayList;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.examples.java.graph.util.PageRankData;
-
-/**
- * A basic implementation of the Page Rank algorithm using a bulk iteration.
- * 
- * <p>
- * This implementation requires a set of pages and a set of directed links as 
input and works as follows. <br> 
- * In each iteration, the rank of every page is evenly distributed to all 
pages it points to.
- * Each page collects the partial ranks of all pages that point to it, sums 
them up, and applies a dampening factor to the sum.
- * The result is the new rank of the page. A new iteration is started with the 
new ranks of all pages.
- * This implementation terminates after a fixed number of iterations.<br>
- * This is the Wikipedia entry for the <a 
href="http://en.wikipedia.org/wiki/Page_rank";>Page Rank algorithm</a>. 
- * 
- * <p>
- * Input files are plain text files and must be formatted as follows:
- * <ul>
- * <li>Pages represented as an (long) ID separated by new-line characters.<br> 
- * For example <code>"1\n2\n12\n42\n63"</code> gives five pages with IDs 1, 2, 
12, 42, and 63.
- * <li>Links are represented as pairs of page IDs which are separated by space 
- * characters. Links are separated by new-line characters.<br>
- * For example <code>"1 2\n2 12\n1 12\n42 63"</code> gives four (directed) 
links (1)-&gt;(2), (2)-&gt;(12), (1)-&gt;(12), and (42)-&gt;(63).<br>
- * For this simple implementation it is required that each page has at least 
one incoming and one outgoing link (a page can point to itself).
- * </ul>
- * 
- * <p>
- * Usage: <code>PageRankBasic &lt;pages path&gt; &lt;links path&gt; &lt;output 
path&gt; &lt;num pages&gt; &lt;num iterations&gt;</code><br>
- * If no parameters are provided, the program is run with default data from 
{@link PageRankData} and 10 iterations.
- * 
- * <p>
- * This example shows how to use:
- * <ul>
- * <li>Bulk Iterations
- * <li>Default Join
- * <li>Configure user-defined functions using constructor parameters.
- * </ul> 
- * 
- *
- */
-@SuppressWarnings("serial")
-public class PageRankBasic {
-       
-       private static final double DAMPENING_FACTOR = 0.85;
-       private static final double EPSILON = 0.0001;
-       
-       // 
*************************************************************************
-       //     PROGRAM
-       // 
*************************************************************************
-       
-       public static void main(String[] args) throws Exception {
-               
-               if(!parseParameters(args)) {
-                       return;
-               }
-               
-               // set up execution environment
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               // get input data
-               DataSet<Long> pagesInput = getPagesDataSet(env);
-               DataSet<Tuple2<Long, Long>> linksInput = getLinksDataSet(env);
-               
-               // assign initial rank to pages
-               DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput.
-                               map(new RankAssigner((1.0d / numPages)));
-               
-               // build adjacency list from link input
-               DataSet<Tuple2<Long, Long[]>> adjacencyListInput = 
-                               linksInput.groupBy(0).reduceGroup(new 
BuildOutgoingEdgeList());
-               
-               // set iterative data set
-               IterativeDataSet<Tuple2<Long, Double>> iteration = 
pagesWithRanks.iterate(maxIterations);
-               
-               DataSet<Tuple2<Long, Double>> newRanks = iteration
-                               // join pages with outgoing edges and 
distribute rank
-                               
.join(adjacencyListInput).where(0).equalTo(0).flatMap(new 
JoinVertexWithEdgesMatch())
-                               // collect and sum ranks
-                               .groupBy(0).aggregate(SUM, 1)
-                               // apply dampening factor
-                               .map(new Dampener(DAMPENING_FACTOR, numPages));
-               
-               DataSet<Tuple2<Long, Double>> finalPageRanks = 
iteration.closeWith(
-                               newRanks, 
-                               newRanks.join(iteration).where(0).equalTo(0)
-                               // termination condition
-                               .filter(new EpsilonFilter()));
-
-               // emit result
-               if(fileOutput) {
-                       finalPageRanks.writeAsCsv(outputPath, "\n", " ");
-                       // execute program
-                       env.execute("Basic Page Rank Example");
-               } else {
-                       finalPageRanks.print();
-               }
-
-               
-       }
-       
-       // 
*************************************************************************
-       //     USER FUNCTIONS
-       // 
*************************************************************************
-
-       /** 
-        * A map function that assigns an initial rank to all pages. 
-        */
-       public static final class RankAssigner implements MapFunction<Long, 
Tuple2<Long, Double>> {
-               Tuple2<Long, Double> outPageWithRank;
-               
-               public RankAssigner(double rank) {
-                       this.outPageWithRank = new Tuple2<Long, Double>(-1l, 
rank);
-               }
-               
-               @Override
-               public Tuple2<Long, Double> map(Long page) {
-                       outPageWithRank.f0 = page;
-                       return outPageWithRank;
-               }
-       }
-       
-       /**
-        * A reduce function that takes a sequence of edges and builds the 
adjacency list for the vertex where the edges
-        * originate. Run as a pre-processing step.
-        */
-       @ForwardedFields("0")
-       public static final class BuildOutgoingEdgeList implements 
GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
-               
-               private final ArrayList<Long> neighbors = new ArrayList<Long>();
-               
-               @Override
-               public void reduce(Iterable<Tuple2<Long, Long>> values, 
Collector<Tuple2<Long, Long[]>> out) {
-                       neighbors.clear();
-                       Long id = 0L;
-                       
-                       for (Tuple2<Long, Long> n : values) {
-                               id = n.f0;
-                               neighbors.add(n.f1);
-                       }
-                       out.collect(new Tuple2<Long, Long[]>(id, 
neighbors.toArray(new Long[neighbors.size()])));
-               }
-       }
-       
-       /**
-        * Join function that distributes a fraction of a vertex's rank to all 
neighbors.
-        */
-       public static final class JoinVertexWithEdgesMatch implements 
FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, 
Tuple2<Long, Double>> {
-
-               @Override
-               public void flatMap(Tuple2<Tuple2<Long, Double>, Tuple2<Long, 
Long[]>> value, Collector<Tuple2<Long, Double>> out){
-                       Long[] neigbors = value.f1.f1;
-                       double rank = value.f0.f1;
-                       double rankToDistribute = rank / ((double) 
neigbors.length);
-                               
-                       for (int i = 0; i < neigbors.length; i++) {
-                               out.collect(new Tuple2<Long, 
Double>(neigbors[i], rankToDistribute));
-                       }
-               }
-       }
-       
-       /**
-        * The function that applies the page rank dampening formula
-        */
-       @ForwardedFields("0")
-       public static final class Dampener implements 
MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
-
-               private final double dampening;
-               private final double randomJump;
-               
-               public Dampener(double dampening, double numVertices) {
-                       this.dampening = dampening;
-                       this.randomJump = (1 - dampening) / numVertices;
-               }
-
-               @Override
-               public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
-                       value.f1 = (value.f1 * dampening) + randomJump;
-                       return value;
-               }
-       }
-       
-       /**
-        * Filter that filters vertices where the rank difference is below a 
threshold.
-        */
-       public static final class EpsilonFilter implements 
FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
-
-               @Override
-               public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, 
Double>> value) {
-                       return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
-               }
-       }
-       
-       // 
*************************************************************************
-       //     UTIL METHODS
-       // 
*************************************************************************
-       
-       private static boolean fileOutput = false;
-       private static String pagesInputPath = null;
-       private static String linksInputPath = null;
-       private static String outputPath = null;
-       private static long numPages = 0;
-       private static int maxIterations = 10;
-       
-       private static boolean parseParameters(String[] args) {
-               
-               if(args.length > 0) {
-                       if(args.length == 5) {
-                               fileOutput = true;
-                               pagesInputPath = args[0];
-                               linksInputPath = args[1];
-                               outputPath = args[2];
-                               numPages = Integer.parseInt(args[3]);
-                               maxIterations = Integer.parseInt(args[4]);
-                       } else {
-                               System.err.println("Usage: PageRankBasic <pages 
path> <links path> <output path> <num pages> <num iterations>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing PageRank Basic example 
with default parameters and built-in default data.");
-                       System.out.println("  Provide parameters to read input 
data from files.");
-                       System.out.println("  See the documentation for the 
correct format of input files.");
-                       System.out.println("  Usage: PageRankBasic <pages path> 
<links path> <output path> <num pages> <num iterations>");
-                       
-                       numPages = PageRankData.getNumberOfPages();
-               }
-               return true;
-       }
-       
-       private static DataSet<Long> getPagesDataSet(ExecutionEnvironment env) {
-               if(fileOutput) {
-                       return env
-                                               .readCsvFile(pagesInputPath)
-                                                       .fieldDelimiter(" ")
-                                                       .lineDelimiter("\n")
-                                                       .types(Long.class)
-                                               .map(new 
MapFunction<Tuple1<Long>, Long>() {
-                                                       @Override
-                                                       public Long 
map(Tuple1<Long> v) { return v.f0; }
-                                               });
-               } else {
-                       return PageRankData.getDefaultPagesDataSet(env);
-               }
-       }
-       
-       private static DataSet<Tuple2<Long, Long>> 
getLinksDataSet(ExecutionEnvironment env) {
-               if(fileOutput) {
-                       return env.readCsvFile(linksInputPath)
-                                               .fieldDelimiter(" ")
-                                               .lineDelimiter("\n")
-                                               .types(Long.class, Long.class);
-               } else {
-                       return PageRankData.getDefaultEdgeDataSet(env);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
deleted file mode 100644
index 5306895..0000000
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.examples.java.graph;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
-import org.apache.flink.util.Collector;
-
-import java.util.HashSet;
-import java.util.Set;
-
-@SuppressWarnings("serial")
-public class TransitiveClosureNaive implements ProgramDescription {
-
-
-       public static void main (String... args) throws Exception{
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               // set up execution environment
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env);
-
-               IterativeDataSet<Tuple2<Long,Long>> paths = 
edges.iterate(maxIterations);
-
-               DataSet<Tuple2<Long,Long>> nextPaths = paths
-                               .join(edges)
-                               .where(1)
-                               .equalTo(0)
-                               .with(new JoinFunction<Tuple2<Long, Long>, 
Tuple2<Long, Long>, Tuple2<Long, Long>>() {
-                                       @Override
-                                       /**
-                                               left: Path (z,x) - x is 
reachable by z
-                                               right: Edge (x,y) - edge x-->y 
exists
-                                               out: Path (z,y) - y is 
reachable by z
-                                        */
-                                       public Tuple2<Long, Long> 
join(Tuple2<Long, Long> left, Tuple2<Long, Long> right) throws Exception {
-                                               return new Tuple2<Long, 
Long>(left.f0, right.f1);
-                                       }
-                               
}).withForwardedFieldsFirst("0").withForwardedFieldsSecond("1")
-                               .union(paths)
-                               .groupBy(0, 1)
-                               .reduceGroup(new 
GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
-                                       @Override
-                                       public void 
reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) 
throws Exception {
-                                               
out.collect(values.iterator().next());
-                                       }
-                               }).withForwardedFields("0;1");
-
-               DataSet<Tuple2<Long,Long>> newPaths = paths
-                               .coGroup(nextPaths)
-                               .where(0).equalTo(0)
-                               .with(new CoGroupFunction<Tuple2<Long, Long>, 
Tuple2<Long, Long>, Tuple2<Long, Long>>() {
-                                       Set<Tuple2<Long,Long>> prevSet = new 
HashSet<Tuple2<Long,Long>>();
-                                       @Override
-                                       public void 
coGroup(Iterable<Tuple2<Long, Long>> prevPaths, Iterable<Tuple2<Long, Long>> 
nextPaths, Collector<Tuple2<Long, Long>> out) throws Exception {
-                                               for (Tuple2<Long,Long> prev : 
prevPaths) {
-                                                       prevSet.add(prev);
-                                               }
-                                               for (Tuple2<Long,Long> next: 
nextPaths) {
-                                                       if 
(!prevSet.contains(next)) {
-                                                               
out.collect(next);
-                                                       }
-                                               }
-                                       }
-                               
}).withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
-
-               DataSet<Tuple2<Long, Long>> transitiveClosure = 
paths.closeWith(nextPaths, newPaths);
-
-
-               // emit result
-               if (fileOutput) {
-                       transitiveClosure.writeAsCsv(outputPath, "\n", " ");
-
-                       // execute program explicitly, because file sinks are 
lazy
-                       env.execute("Transitive Closure Example");
-               } else {
-                       transitiveClosure.print();
-               }
-       }
-
-       @Override
-       public String getDescription() {
-               return "Parameters: <edges-path> <result-path> 
<max-number-of-iterations>";
-       }
-
-       // 
*************************************************************************
-       //     UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String edgesPath = null;
-       private static String outputPath = null;
-       private static int maxIterations = 10;
-
-       private static boolean parseParameters(String[] programArguments) {
-
-               if (programArguments.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if (programArguments.length == 3) {
-                               edgesPath = programArguments[0];
-                               outputPath = programArguments[1];
-                               maxIterations = 
Integer.parseInt(programArguments[2]);
-                       } else {
-                               System.err.println("Usage: TransitiveClosure 
<edges path> <result path> <max number of iterations>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing TransitiveClosure example 
with default parameters and built-in default data.");
-                       System.out.println("  Provide parameters to read input 
data from files.");
-                       System.out.println("  See the documentation for the 
correct format of input files.");
-                       System.out.println("  Usage: TransitiveClosure <edges 
path> <result path> <max number of iterations>");
-               }
-               return true;
-       }
-
-
-       private static DataSet<Tuple2<Long, Long>> 
getEdgeDataSet(ExecutionEnvironment env) {
-
-               if(fileOutput) {
-                       return env.readCsvFile(edgesPath).fieldDelimiter(" 
").types(Long.class, Long.class);
-               } else {
-                       return 
ConnectedComponentsData.getDefaultEdgeDataSet(env);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
deleted file mode 100644
index bd68244..0000000
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.examples.java.graph.util;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * Provides the default data sets used for the Connected Components example 
program.
- * The default data sets are used, if no parameters are given to the program.
- *
- */
-public class ConnectedComponentsData {
-       
-       public static final long[] VERTICES  = new long[] {
-                       1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
-
-       public static DataSet<Long> 
getDefaultVertexDataSet(ExecutionEnvironment env) {
-               List<Long> verticesList = new LinkedList<Long>();
-               for (long vertexId : VERTICES) {
-                       verticesList.add(vertexId);
-               }
-               return env.fromCollection(verticesList);
-       }
-       
-       public static final Object[][] EDGES = new Object[][] {
-               new Object[]{1L, 2L},
-               new Object[]{2L, 3L},
-               new Object[]{2L, 4L},
-               new Object[]{3L, 5L},
-               new Object[]{6L, 7L},
-               new Object[]{8L, 9L},
-               new Object[]{8L, 10L},
-               new Object[]{5L, 11L},
-               new Object[]{11L, 12L},
-               new Object[]{10L, 13L},
-               new Object[]{9L, 14L},
-               new Object[]{13L, 14L},
-               new Object[]{1L, 15L},
-               new Object[]{16L, 1L}
-       };
-       
-       public static DataSet<Tuple2<Long, Long>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
-               
-               List<Tuple2<Long, Long>> edgeList = new LinkedList<Tuple2<Long, 
Long>>();
-               for (Object[] edge : EDGES) {
-                       edgeList.add(new Tuple2<Long, Long>((Long) edge[0], 
(Long) edge[1]));
-               }
-               return env.fromCollection(edgeList);
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
deleted file mode 100644
index 331f0a5..0000000
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.examples.java.graph.util;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge;
-
-/**
- * Provides the default data sets used for the Triangle Enumeration example 
programs.
- * The default data sets are used, if no parameters are given to the program.
- *
- */
-public class EnumTrianglesData {
-
-       public static final Object[][] EDGES = {
-               {1, 2},
-               {1, 3},
-               {1 ,4},
-               {1, 5},
-               {2, 3},
-               {2, 5},
-               {3, 4},
-               {3, 7},
-               {3, 8},
-               {5, 6},
-               {7, 8}
-       };
-       
-       public static DataSet<Edge> getDefaultEdgeDataSet(ExecutionEnvironment 
env) {
-               
-               List<Edge> edges = new ArrayList<Edge>();
-               for(Object[] e : EDGES) {
-                       edges.add(new Edge((Integer)e[0], (Integer)e[1]));
-               }
-               
-               return env.fromCollection(edges);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java
deleted file mode 100644
index acd3f03..0000000
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.examples.java.graph.util;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-
-public class EnumTrianglesDataTypes {
-
-       public static class Edge extends Tuple2<Integer, Integer> {
-               private static final long serialVersionUID = 1L;
-               
-               public static final int V1 = 0;
-               public static final int V2 = 1;
-               
-               public Edge() {}
-               
-               public Edge(final Integer v1, final Integer v2) {
-                       this.setFirstVertex(v1);
-                       this.setSecondVertex(v2);
-               }
-               
-               public Integer getFirstVertex() { return this.getField(V1); }
-               
-               public Integer getSecondVertex() { return this.getField(V2); }
-               
-               public void setFirstVertex(final Integer vertex1) { 
this.setField(vertex1, V1); }
-               
-               public void setSecondVertex(final Integer vertex2) { 
this.setField(vertex2, V2); }
-               
-               public void copyVerticesFromTuple2(Tuple2<Integer, Integer> t) {
-                       this.setFirstVertex(t.f0);
-                       this.setSecondVertex(t.f1);
-               }
-               
-               public void copyVerticesFromEdgeWithDegrees(EdgeWithDegrees 
ewd) {
-                       this.setFirstVertex(ewd.getFirstVertex());
-                       this.setSecondVertex(ewd.getSecondVertex());
-               }
-               
-               public void flipVertices() {
-                       Integer tmp = this.getFirstVertex();
-                       this.setFirstVertex(this.getSecondVertex());
-                       this.setSecondVertex(tmp);
-               }
-       }
-       
-       public static class Triad extends Tuple3<Integer, Integer, Integer> {
-               private static final long serialVersionUID = 1L;
-               
-               public static final int V1 = 0;
-               public static final int V2 = 1;
-               public static final int V3 = 2;
-               
-               public Triad() {}
-               
-               public void setFirstVertex(final Integer vertex1) { 
this.setField(vertex1, V1); }
-               
-               public void setSecondVertex(final Integer vertex2) { 
this.setField(vertex2, V2); }
-               
-               public void setThirdVertex(final Integer vertex3) { 
this.setField(vertex3, V3); }
-       }
-       
-       public static class EdgeWithDegrees extends Tuple4<Integer, Integer, 
Integer, Integer> {
-               private static final long serialVersionUID = 1L;
-
-               public static final int V1 = 0;
-               public static final int V2 = 1;
-               public static final int D1 = 2;
-               public static final int D2 = 3;
-               
-               public EdgeWithDegrees() { }
-                       
-               public Integer getFirstVertex() { return this.getField(V1); }
-               
-               public Integer getSecondVertex() { return this.getField(V2); }
-               
-               public Integer getFirstDegree() { return this.getField(D1); }
-               
-               public Integer getSecondDegree() { return this.getField(D2); }
-               
-               public void setFirstVertex(final Integer vertex1) { 
this.setField(vertex1, V1); }
-               
-               public void setSecondVertex(final Integer vertex2) { 
this.setField(vertex2, V2); }
-               
-               public void setFirstDegree(final Integer degree1) { 
this.setField(degree1, D1); }
-               
-               public void setSecondDegree(final Integer degree2) { 
this.setField(degree2, D2); }
-               
-               public void copyFrom(final EdgeWithDegrees edge) {
-                       this.setFirstVertex(edge.getFirstVertex());
-                       this.setSecondVertex(edge.getSecondVertex());
-                       this.setFirstDegree(edge.getFirstDegree());
-                       this.setSecondDegree(edge.getSecondDegree());
-               }
-       }
-       
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java
deleted file mode 100644
index d4e8a80..0000000
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.examples.java.graph.util;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-/**
- * Provides the default data sets used for the PageRank example program.
- * The default data sets are used, if no parameters are given to the program.
- *
- */
-public class PageRankData {
-
-       public static final Object[][] EDGES = {
-               {1L, 2L},
-               {1L, 15L},
-               {2L, 3L},
-               {2L, 4L},
-               {2L, 5L},
-               {2L, 6L},
-               {2L, 7L},
-               {3L, 13L},
-               {4L, 2L},
-               {5L, 11L},
-               {5L, 12L},
-               {6L, 1L},
-               {6L, 7L},
-               {6L, 8L},
-               {7L, 1L},
-               {7L, 8L},
-               {8L, 1L},
-               {8L, 9L},
-               {8L, 10L},
-               {9L, 14L},
-               {9L, 1L},
-               {10L, 1L},
-               {10L, 13L},
-               {11L, 12L},
-               {11L, 1L},
-               {12L, 1L},
-               {13L, 14L},
-               {14L, 12L},
-               {15L, 1L},
-       };
-       
-       private static long numPages = 15;
-       
-       public static DataSet<Tuple2<Long, Long>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
-               
-               List<Tuple2<Long, Long>> edges = new ArrayList<Tuple2<Long, 
Long>>();
-               for(Object[] e : EDGES) {
-                       edges.add(new Tuple2<Long, Long>((Long)e[0], 
(Long)e[1]));
-               }
-               return env.fromCollection(edges);
-       }
-       
-       public static DataSet<Long> getDefaultPagesDataSet(ExecutionEnvironment 
env) {
-               return env.generateSequence(1, 15);
-       }
-       
-       public static long getNumberOfPages() {
-               return numPages;
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
deleted file mode 100644
index e5b94e9..0000000
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.misc;
-
-import java.util.List;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-/** 
- * This example shows how to use the collection based execution of Flink.
- * 
- * The collection based execution is a local mode that is not using the full 
Flink runtime.
- * DataSet transformations are executed on Java collections.
- * 
- * See the "Local Execution" section in the documentation for more details: 
- *     http://flink.apache.org/docs/latest/apis/local_execution.html
- * 
- */
-public class CollectionExecutionExample {
-       
-       /**
-        * POJO class representing a user
-        */
-       public static class User {
-               public int userIdentifier;
-               public String name;
-               public User() {}
-               public User(int userIdentifier, String name) {
-                       this.userIdentifier = userIdentifier; this.name = name;
-               }
-               public String toString() {
-                       return "User{userIdentifier="+userIdentifier+" 
name="+name+"}";
-               }
-       }
-       
-       /**
-        * POJO for an EMail.
-        */
-       public static class EMail {
-               public int userId;
-               public String subject;
-               public String body;
-               public EMail() {}
-               public EMail(int userId, String subject, String body) {
-                       this.userId = userId; this.subject = subject; this.body 
= body;
-               }
-               public String toString() {
-                       return "eMail{userId="+userId+" subject="+subject+" 
body="+body+"}";
-               }
-               
-       }
-       public static void main(String[] args) throws Exception {
-               // initialize a new Collection-based execution environment
-               final ExecutionEnvironment env = 
ExecutionEnvironment.createCollectionsEnvironment();
-               
-               // create objects for users and emails
-               User[] usersArray = { new User(1, "Peter"), new User(2, 
"John"), new User(3, "Bill") };
-               
-               EMail[] emailsArray = {new EMail(1, "Re: Meeting", "How about 
1pm?"),
-                                                       new EMail(1, "Re: 
Meeting", "Sorry, I'm not availble"),
-                                                       new EMail(3, "Re: Re: 
Project proposal", "Give me a few more days to think about it.")};
-               
-               // convert objects into a DataSet
-               DataSet<User> users = env.fromElements(usersArray);
-               DataSet<EMail> emails = env.fromElements(emailsArray);
-               
-               // join the two DataSets
-               DataSet<Tuple2<User,EMail>> joined = 
users.join(emails).where("userIdentifier").equalTo("userId");
-               
-               // retrieve the resulting Tuple2 elements into a ArrayList.
-               List<Tuple2<User,EMail>> result = joined.collect();
-               
-               // Do some work with the resulting ArrayList (=Collection).
-               for(Tuple2<User, EMail> t : result) {
-                       System.err.println("Result = " + t);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
deleted file mode 100644
index fc85110..0000000
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.misc;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/** 
- * Estimates the value of Pi using the Monte Carlo method.
- * The area of a circle is Pi * R^2, R being the radius of the circle 
- * The area of a square is 4 * R^2, where the length of the square's edge is 
2*R.
- * 
- * Thus Pi = 4 * (area of circle / area of square).
- * 
- * The idea is to find a way to estimate the circle to square area ratio.
- * The Monte Carlo method suggests collecting random points (within the square)
- * and then counting the number of points that fall within the circle
- * 
- * <pre>
- * {@code
- * x = Math.random()
- * y = Math.random()
- * 
- * x * x + y * y < 1
- * }
- * </pre>
- */
-@SuppressWarnings("serial")
-public class PiEstimation implements java.io.Serializable {
-       
-       
-       public static void main(String[] args) throws Exception {
-
-               final long numSamples = args.length > 0 ? 
Long.parseLong(args[0]) : 1000000;
-               
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               
-               // count how many of the samples would randomly fall into
-               // the unit circle
-               DataSet<Long> count = 
-                               env.generateSequence(1, numSamples)
-                               .map(new Sampler())
-                               .reduce(new SumReducer());
-
-               long theCount = count.collect().get(0);
-
-               System.out.println("We estimate Pi to be: " + (theCount * 4.0 / 
numSamples));
-       }
-
-       
//*************************************************************************
-       //     USER FUNCTIONS
-       
//*************************************************************************
-       
-       
-       /** 
-        * Sampler randomly emits points that fall within a square of edge x * 
y.
-        * It calculates the distance to the center of a virtually centered 
circle of radius x = y = 1
-        * If the distance is less than 1, then and only then does it returns a 
1.
-        */
-       public static class Sampler implements MapFunction<Long, Long> {
-
-               @Override
-               public Long map(Long value) {
-                       double x = Math.random();
-                       double y = Math.random();
-                       return (x * x + y * y) < 1 ? 1L : 0L;
-               }
-       }
-
-       
-       /** 
-        * Simply sums up all long values.
-        */
-       public static final class SumReducer implements ReduceFunction<Long>{
-
-               @Override
-               public Long reduce(Long value1, Long value2) {
-                       return value1 + value2;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
deleted file mode 100644
index 9c3356c..0000000
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.examples.java.ml;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.examples.java.ml.util.LinearRegressionData;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-
-/**
- * This example implements a basic Linear Regression  to solve the y = theta0 
+ theta1*x problem using batch gradient descent algorithm.
- *
- * <p>
- * Linear Regression with BGD(batch gradient descent) algorithm is an 
iterative clustering algorithm and works as follows:<br>
- * Giving a data set and target set, the BGD try to find out the best 
parameters for the data set to fit the target set.
- * In each iteration, the algorithm computes the gradient of the cost function 
and use it to update all the parameters.
- * The algorithm terminates after a fixed number of iterations (as in this 
implementation)
- * With enough iteration, the algorithm can minimize the cost function and 
find the best parameters
- * This is the Wikipedia entry for the <a href = 
"http://en.wikipedia.org/wiki/Linear_regression";>Linear regression</a> and <a 
href = "http://en.wikipedia.org/wiki/Gradient_descent";>Gradient descent 
algorithm</a>.
- * 
- * <p>
- * This implementation works on one-dimensional data. And find the 
two-dimensional theta.<br>
- * It find the best Theta parameter to fit the target.
- * 
- * <p>
- * Input files are plain text files and must be formatted as follows:
- * <ul>
- * <li>Data points are represented as two double values separated by a blank 
character. The first one represent the X(the training data) and the second 
represent the Y(target).
- * Data points are separated by newline characters.<br>
- * For example <code>"-0.02 -0.04\n5.3 10.6\n"</code> gives two data points 
(x=-0.02, y=-0.04) and (x=5.3, y=10.6).
- * </ul>
- * 
- * <p>
- * This example shows how to use:
- * <ul>
- * <li> Bulk iterations
- * <li> Broadcast variables in bulk iterations
- * <li> Custom Java objects (PoJos)
- * </ul>
- */
-@SuppressWarnings("serial")
-public class LinearRegression {
-
-       // 
*************************************************************************
-       //     PROGRAM
-       // 
*************************************************************************
-
-       public static void main(String[] args) throws Exception{
-
-               if(!parseParameters(args)) {
-                       return;
-               }
-
-               // set up execution environment
-
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               // get input x data from elements
-               DataSet<Data> data = getDataSet(env);
-
-               // get the parameters from elements
-               DataSet<Params> parameters = getParamsDataSet(env);
-
-               // set number of bulk iterations for SGD linear Regression
-               IterativeDataSet<Params> loop = 
parameters.iterate(numIterations);
-
-               DataSet<Params> new_parameters = data
-                               // compute a single step using every sample
-                               .map(new SubUpdate()).withBroadcastSet(loop, 
"parameters")
-                               // sum up all the steps
-                               .reduce(new UpdateAccumulator())
-                               // average the steps and update all parameters
-                               .map(new Update());
-
-               // feed new parameters back into next iteration
-               DataSet<Params> result = loop.closeWith(new_parameters);
-
-               // emit result
-               if(fileOutput) {
-                       result.writeAsText(outputPath);
-                       // execute program
-                       env.execute("Linear Regression example");
-               } else {
-                       result.print();
-               }
-       }
-
-       // 
*************************************************************************
-       //     DATA TYPES
-       // 
*************************************************************************
-
-       /**
-        * A simple data sample, x means the input, and y means the target.
-        */
-       public static class Data implements Serializable{
-               public double x,y;
-
-               public Data() {};
-
-               public Data(double x ,double y){
-                       this.x = x;
-                       this.y = y;
-               }
-
-               @Override
-               public String toString() {
-                       return "(" + x + "|" + y + ")";
-               }
-
-       }
-
-       /**
-        * A set of parameters -- theta0, theta1.
-        */
-       public static class Params implements Serializable{
-
-               private double theta0,theta1;
-
-               public Params(){};
-
-               public Params(double x0, double x1){
-                       this.theta0 = x0;
-                       this.theta1 = x1;
-               }
-
-               @Override
-               public String toString() {
-                       return theta0 + " " + theta1;
-               }
-
-               public double getTheta0() {
-                       return theta0;
-               }
-
-               public double getTheta1() {
-                       return theta1;
-               }
-
-               public void setTheta0(double theta0) {
-                       this.theta0 = theta0;
-               }
-
-               public void setTheta1(double theta1) {
-                       this.theta1 = theta1;
-               }
-
-               public Params div(Integer a){
-                       this.theta0 = theta0 / a ;
-                       this.theta1 = theta1 / a ;
-                       return this;
-               }
-
-       }
-
-       // 
*************************************************************************
-       //     USER FUNCTIONS
-       // 
*************************************************************************
-
-       /** Converts a {@code Tuple2<Double,Double>} into a Data. */
-       @ForwardedFields("0->x; 1->y")
-       public static final class TupleDataConverter implements 
MapFunction<Tuple2<Double, Double>, Data> {
-
-               @Override
-               public Data map(Tuple2<Double, Double> t) throws Exception {
-                       return new Data(t.f0, t.f1);
-               }
-       }
-
-       /** Converts a {@code Tuple2<Double,Double>} into a Params. */
-       @ForwardedFields("0->theta0; 1->theta1")
-       public static final class TupleParamsConverter implements 
MapFunction<Tuple2<Double, Double>,Params> {
-
-               @Override
-               public Params map(Tuple2<Double, Double> t)throws Exception {
-                       return new Params(t.f0,t.f1);
-               }
-       }
-
-       /**
-        * Compute a single BGD type update for every parameters.
-        */
-       public static class SubUpdate extends 
RichMapFunction<Data,Tuple2<Params,Integer>> {
-
-               private Collection<Params> parameters; 
-
-               private Params parameter;
-
-               private int count = 1;
-
-               /** Reads the parameters from a broadcast variable into a 
collection. */
-               @Override
-               public void open(Configuration parameters) throws Exception {
-                       this.parameters = 
getRuntimeContext().getBroadcastVariable("parameters");
-               }
-
-               @Override
-               public Tuple2<Params, Integer> map(Data in) throws Exception {
-
-                       for(Params p : parameters){
-                               this.parameter = p; 
-                       }
-
-                       double theta_0 = parameter.theta0 - 
0.01*((parameter.theta0 + (parameter.theta1*in.x)) - in.y);
-                       double theta_1 = parameter.theta1 - 
0.01*(((parameter.theta0 + (parameter.theta1*in.x)) - in.y) * in.x);
-
-                       return new Tuple2<Params,Integer>(new 
Params(theta_0,theta_1),count);
-               }
-       }
-
-       /**  
-        * Accumulator all the update.
-        * */
-       public static class UpdateAccumulator implements 
ReduceFunction<Tuple2<Params, Integer>> {
-
-               @Override
-               public Tuple2<Params, Integer> reduce(Tuple2<Params, Integer> 
val1, Tuple2<Params, Integer> val2) {
-
-                       double new_theta0 = val1.f0.theta0 + val2.f0.theta0;
-                       double new_theta1 = val1.f0.theta1 + val2.f0.theta1;
-                       Params result = new Params(new_theta0,new_theta1);
-                       return new Tuple2<Params, Integer>( result, val1.f1 + 
val2.f1);
-
-               }
-       }
-
-       /**
-        * Compute the final update by average them.
-        */
-       public static class Update implements MapFunction<Tuple2<Params, 
Integer>,Params> {
-
-               @Override
-               public Params map(Tuple2<Params, Integer> arg0) throws 
Exception {
-
-                       return arg0.f0.div(arg0.f1);
-
-               }
-
-       }
-       // 
*************************************************************************
-       //     UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String dataPath = null;
-       private static String outputPath = null;
-       private static int numIterations = 10;
-
-       private static boolean parseParameters(String[] programArguments) {
-
-               if(programArguments.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if(programArguments.length == 3) {
-                               dataPath = programArguments[0];
-                               outputPath = programArguments[1];
-                               numIterations = 
Integer.parseInt(programArguments[2]);
-                       } else {
-                               System.err.println("Usage: LinearRegression 
<data path> <result path> <num iterations>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing Linear Regression example 
with default parameters and built-in default data.");
-                       System.out.println("  Provide parameters to read input 
data from files.");
-                       System.out.println("  See the documentation for the 
correct format of input files.");
-                       System.out.println("  We provide a data generator to 
create synthetic input files for this program.");
-                       System.out.println("  Usage: LinearRegression <data 
path> <result path> <num iterations>");
-               }
-               return true;
-       }
-
-       private static DataSet<Data> getDataSet(ExecutionEnvironment env) {
-               if(fileOutput) {
-                       // read data from CSV file
-                       return env.readCsvFile(dataPath)
-                                       .fieldDelimiter(" ")
-                                       .includeFields(true, true)
-                                       .types(Double.class, Double.class)
-                                       .map(new TupleDataConverter());
-               } else {
-                       return LinearRegressionData.getDefaultDataDataSet(env);
-               }
-       }
-
-       private static DataSet<Params> getParamsDataSet(ExecutionEnvironment 
env) {
-
-               return LinearRegressionData.getDefaultParamsDataSet(env);
-
-       }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
deleted file mode 100644
index 006b8b5..0000000
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.ml.util;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.examples.java.ml.LinearRegression.Data;
-import org.apache.flink.examples.java.ml.LinearRegression.Params;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Provides the default data sets used for the Linear Regression example
- * program. The default data sets are used, if no parameters are given to the
- * program.
- */
-public class LinearRegressionData {
-
-       // We have the data as object arrays so that we can also generate Scala 
Data
-       // Sources from it.
-       public static final Object[][] PARAMS = new Object[][] { new Object[] {
-                       0.0, 0.0 } };
-
-       public static final Object[][] DATA = new Object[][] {
-                       new Object[] { 0.5, 1.0 }, new Object[] { 1.0, 2.0 },
-                       new Object[] { 2.0, 4.0 }, new Object[] { 3.0, 6.0 },
-                       new Object[] { 4.0, 8.0 }, new Object[] { 5.0, 10.0 },
-                       new Object[] { 6.0, 12.0 }, new Object[] { 7.0, 14.0 },
-                       new Object[] { 8.0, 16.0 }, new Object[] { 9.0, 18.0 },
-                       new Object[] { 10.0, 20.0 }, new Object[] { -0.08, 
-0.16 },
-                       new Object[] { 0.13, 0.26 }, new Object[] { -1.17, 
-2.35 },
-                       new Object[] { 1.72, 3.45 }, new Object[] { 1.70, 3.41 
},
-                       new Object[] { 1.20, 2.41 }, new Object[] { -0.59, 
-1.18 },
-                       new Object[] { 0.28, 0.57 }, new Object[] { 1.65, 3.30 
},
-                       new Object[] { -0.55, -1.08 } };
-
-       public static DataSet<Params> getDefaultParamsDataSet(
-                       ExecutionEnvironment env) {
-               List<Params> paramsList = new LinkedList<Params>();
-               for (Object[] params : PARAMS) {
-                       paramsList.add(new Params((Double) params[0], (Double) 
params[1]));
-               }
-               return env.fromCollection(paramsList);
-       }
-
-       public static DataSet<Data> getDefaultDataDataSet(ExecutionEnvironment 
env) {
-
-               List<Data> dataList = new LinkedList<Data>();
-               for (Object[] data : DATA) {
-                       dataList.add(new Data((Double) data[0], (Double) 
data[1]));
-               }
-               return env.fromCollection(dataList);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java
deleted file mode 100644
index d95467d..0000000
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.examples.java.ml.util;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.text.DecimalFormat;
-import java.util.Locale;
-import java.util.Random;
-
-/**
- * Generates data for the {@link 
org.apache.flink.examples.java.ml.LinearRegression} example program.
- */
-public class LinearRegressionDataGenerator {
-
-       static {
-               Locale.setDefault(Locale.US);
-       }
-
-       private static final String POINTS_FILE = "data";
-       private static final long DEFAULT_SEED = 4650285087650871364L;
-       private static final int DIMENSIONALITY = 1;
-       private static final DecimalFormat FORMAT = new DecimalFormat("#0.00");
-       private static final char DELIMITER = ' ';
-
-       /**
-        * Main method to generate data for the {@link 
org.apache.flink.examples.java.ml.LinearRegression} example program.
-        * <p>
-        * The generator creates to files:
-        * <ul>
-        * <li><code>{tmp.dir}/data</code> for the data points
-        * </ul> 
-        * 
-        * @param args 
-        * <ol>
-        * <li>Int: Number of data points
-        * <li><b>Optional</b> Long: Random seed
-        * </ol>
-        */
-       public static void main(String[] args) throws IOException {
-
-               // check parameter count
-               if (args.length < 1) {
-                       System.out.println("LinearRegressionDataGenerator 
<numberOfDataPoints> [<seed>]");
-                       System.exit(1);
-               }
-
-               // parse parameters
-               final int numDataPoints = Integer.parseInt(args[0]);
-               final long firstSeed = args.length > 1 ? 
Long.parseLong(args[4]) : DEFAULT_SEED;
-               final Random random = new Random(firstSeed);
-               final String tmpDir = System.getProperty("java.io.tmpdir");
-
-               // write the points out
-               BufferedWriter pointsOut = null;
-               try {
-                       pointsOut = new BufferedWriter(new FileWriter(new 
File(tmpDir+"/"+POINTS_FILE)));
-                       StringBuilder buffer = new StringBuilder();
-
-                       // DIMENSIONALITY + 1 means that the number of 
x(dimensionality) and target y
-                       double[] point = new double[DIMENSIONALITY+1];
-
-                       for (int i = 1; i <= numDataPoints; i++) {
-                               point[0] = random.nextGaussian();
-                               point[1] = 2 * point[0] + 
0.01*random.nextGaussian();
-                               writePoint(point, buffer, pointsOut);
-                       }
-
-               }
-               finally {
-                       if (pointsOut != null) {
-                               pointsOut.close();
-                       }
-               }
-
-               System.out.println("Wrote "+numDataPoints+" data points to 
"+tmpDir+"/"+POINTS_FILE);
-       }
-
-
-       private static void writePoint(double[] data, StringBuilder buffer, 
BufferedWriter out) throws IOException {
-               buffer.setLength(0);
-
-               // write coordinates
-               for (int j = 0; j < data.length; j++) {
-                       buffer.append(FORMAT.format(data[j]));
-                       if(j < data.length - 1) {
-                               buffer.append(DELIMITER);
-                       }
-               }
-
-               out.write(buffer.toString());
-               out.newLine();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
deleted file mode 100644
index 6813c62..0000000
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.relational;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * This program filters lines from a CSV file with empty fields. In doing so, 
it counts the number of empty fields per
- * column within a CSV file using a custom accumulator for vectors. In this 
context, empty fields are those, that at
- * most contain whitespace characters like space and tab.
- * <p>
- * The input file is a plain text CSV file with the semicolon as field 
separator and double quotes as field delimiters
- * and three columns. See {@link #getDataSet(ExecutionEnvironment)} for 
configuration.
- * <p>
- * Usage: <code>FilterAndCountIncompleteLines [&lt;input file path&gt; 
[&lt;result path&gt;]]</code> <br>
- * <p>
- * This example shows how to use:
- * <ul>
- * <li>custom accumulators
- * <li>tuple data types
- * <li>inline-defined functions
- * </ul>
- */
-@SuppressWarnings("serial")
-public class EmptyFieldsCountAccumulator {
-
-       // 
*************************************************************************
-       // PROGRAM
-       // 
*************************************************************************
-
-       private static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields";
-
-       public static void main(final String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               // get the data set
-               final DataSet<Tuple> file = getDataSet(env);
-
-               // filter lines with empty fields
-               final DataSet<Tuple> filteredLines = file.filter(new 
EmptyFieldFilter());
-
-               // Here, we could do further processing with the filtered 
lines...
-               JobExecutionResult result;
-               // output the filtered lines
-               if (outputPath == null) {
-                       filteredLines.print();
-                       result = env.getLastJobExecutionResult();
-               } else {
-                       filteredLines.writeAsCsv(outputPath);
-                       // execute program
-                       result = env.execute("Accumulator example");
-               }
-
-               // get the accumulator result via its registration key
-               final List<Integer> emptyFields = 
result.getAccumulatorResult(EMPTY_FIELD_ACCUMULATOR);
-               System.out.format("Number of detected empty fields per column: 
%s\n", emptyFields);
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static String filePath;
-       private static String outputPath;
-
-       private static boolean parseParameters(final String[] programArguments) 
{
-
-               if (programArguments.length >= 3) {
-                       System.err.println("Usage: 
FilterAndCountIncompleteLines [<input file path> [<result path>]]");
-                       return false;
-               }
-
-               if (programArguments.length >= 1) {
-                       filePath = programArguments[0];
-                       if (programArguments.length == 2) {
-                               outputPath = programArguments[1];
-                       }
-               }
-
-               return true;
-       }
-
-       @SuppressWarnings("unchecked")
-       private static DataSet<Tuple> getDataSet(final ExecutionEnvironment 
env) {
-
-               DataSet<? extends Tuple> source;
-               if (filePath == null) {
-                       source = env.fromCollection(getExampleInputTuples());
-
-               } else {
-                       source = env
-                                       .readCsvFile(filePath)
-                                       .fieldDelimiter(";")
-                                       .types(String.class, String.class, 
String.class);
-
-               }
-
-               return (DataSet<Tuple>) source;
-       }
-
-       private static Collection<Tuple3<String, String, String>> 
getExampleInputTuples() {
-               Collection<Tuple3<String, String, String>> inputTuples = new 
ArrayList<Tuple3<String, String, String>>();
-               inputTuples.add(new Tuple3<String, String, String>("John", 
"Doe", "Foo Str."));
-               inputTuples.add(new Tuple3<String, String, String>("Joe", 
"Johnson", ""));
-               inputTuples.add(new Tuple3<String, String, String>(null, "Kate 
Morn", "Bar Blvd."));
-               inputTuples.add(new Tuple3<String, String, String>("Tim", 
"Rinny", ""));
-               inputTuples.add(new Tuple3<String, String, String>("Alicia", 
"Jackson", "  "));
-               return inputTuples;
-       }
-
-       /**
-        * This function filters all incoming tuples that have one or more 
empty fields.
-        * In doing so, it also counts the number of empty fields per attribute 
with an accumulator (registered under 
-        * {@link EmptyFieldsCountAccumulator#EMPTY_FIELD_ACCUMULATOR}).
-        */
-       public static final class EmptyFieldFilter extends 
RichFilterFunction<Tuple> {
-
-               // create a new accumulator in each filter function instance
-               // accumulators can be merged later on
-               private final VectorAccumulator emptyFieldCounter = new 
VectorAccumulator();
-
-               @Override
-               public void open(final Configuration parameters) throws 
Exception {
-                       super.open(parameters);
-
-                       // register the accumulator instance
-                       
getRuntimeContext().addAccumulator(EMPTY_FIELD_ACCUMULATOR,
-                                       this.emptyFieldCounter);
-               }
-
-               @Override
-               public boolean filter(final Tuple t) {
-                       boolean containsEmptyFields = false;
-
-                       // iterate over the tuple fields looking for empty ones
-                       for (int pos = 0; pos < t.getArity(); pos++) {
-
-                               final String field = t.getField(pos);
-                               if (field == null || field.trim().isEmpty()) {
-                                       containsEmptyFields = true;
-
-                                       // if an empty field is encountered, 
update the
-                                       // accumulator
-                                       this.emptyFieldCounter.add(pos);
-                               }
-                       }
-
-                       return !containsEmptyFields;
-               }
-       }
-
-       /**
-        * This accumulator maintains a vector of counts. Calling {@link 
#add(Integer)} increments the
-        * <i>n</i>-th vector component. The size of the vector is 
automatically managed.
-        */
-       public static class VectorAccumulator implements Accumulator<Integer, 
ArrayList<Integer>> {
-
-               /** Stores the accumulated vector components. */
-               private final ArrayList<Integer> resultVector;
-
-               public VectorAccumulator(){
-                       this(new ArrayList<Integer>());
-               }
-
-               public VectorAccumulator(ArrayList<Integer> resultVector){
-                       this.resultVector = resultVector;
-               }
-
-               /**
-                * Increases the result vector component at the specified 
position by 1.
-                */
-               @Override
-               public void add(Integer position) {
-                       updateResultVector(position, 1);
-               }
-
-               /**
-                * Increases the result vector component at the specified 
position by the specified delta.
-                */
-               private void updateResultVector(int position, int delta) {
-                       // inflate the vector to contain the given position
-                       while (this.resultVector.size() <= position) {
-                               this.resultVector.add(0);
-                       }
-
-                       // increment the component value
-                       final int component = this.resultVector.get(position);
-                       this.resultVector.set(position, component + delta);
-               }
-
-               @Override
-               public ArrayList<Integer> getLocalValue() {
-                       return this.resultVector;
-               }
-
-               @Override
-               public void resetLocal() {
-                       // clear the result vector if the accumulator instance 
shall be reused
-                       this.resultVector.clear();
-               }
-
-               @Override
-               public void merge(final Accumulator<Integer, 
ArrayList<Integer>> other) {
-                       // merge two vector accumulators by adding their up 
their vector components
-                       final List<Integer> otherVector = other.getLocalValue();
-                       for (int index = 0; index < otherVector.size(); 
index++) {
-                               updateResultVector(index, 
otherVector.get(index));
-                       }
-               }
-
-               @Override
-               public Accumulator<Integer, ArrayList<Integer>> clone() {
-                       return new VectorAccumulator(new 
ArrayList<Integer>(resultVector));
-               }
-
-               @Override
-               public String toString() {
-                       return StringUtils.join(resultVector, ',');
-               }
-       }
-}

Reply via email to