http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java deleted file mode 100644 index c815ca4..0000000 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesBasic.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.examples.java.graph; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.util.Collector; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.examples.java.graph.util.EnumTrianglesData; -import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge; -import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Triad; - -/** - * Triangle enumeration is a pre-processing step to find closely connected parts in graphs. - * A triangle consists of three edges that connect three vertices with each other. - * - * <p> - * The algorithm works as follows: - * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices - * that are connected by two edges. Finally, all triads are filtered for which no third edge exists - * that closes the triangle. - * - * <p> - * Input files are plain text files and must be formatted as follows: - * <ul> - * <li>Edges are represented as pairs for vertex IDs which are separated by space - * characters. Edges are separated by new-line characters.<br> - * For example <code>"1 2\n2 12\n1 12\n42 63"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63) - * that include a triangle - * </ul> - * <pre> - * (1) - * / \ - * (2)-(12) - * </pre> - * - * Usage: <code>EnumTriangleBasic <edge path> <result path></code><br> - * If no parameters are provided, the program is run with default data from {@link EnumTrianglesData}. - * - * <p> - * This example shows how to use: - * <ul> - * <li>Custom Java objects which extend Tuple - * <li>Group Sorting - * </ul> - * - */ -@SuppressWarnings("serial") -public class EnumTrianglesBasic { - - static boolean fileOutput = false; - static String edgePath = null; - static String outputPath = null; - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - if(!parseParameters(args)) { - return; - } - - // set up execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - // read input data - DataSet<Edge> edges = getEdgeDataSet(env); - - // project edges by vertex id - DataSet<Edge> edgesById = edges - .map(new EdgeByIdProjector()); - - DataSet<Triad> triangles = edgesById - // build triads - .groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new TriadBuilder()) - // filter triads - .join(edgesById).where(Triad.V2, Triad.V3).equalTo(Edge.V1, Edge.V2).with(new TriadFilter()); - - // emit result - if (fileOutput) { - triangles.writeAsCsv(outputPath, "\n", ","); - // execute program - env.execute("Basic Triangle Enumeration Example"); - } else { - triangles.print(); - } - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** Converts a Tuple2 into an Edge */ - @ForwardedFields("0;1") - public static class TupleEdgeConverter implements MapFunction<Tuple2<Integer, Integer>, Edge> { - private final Edge outEdge = new Edge(); - - @Override - public Edge map(Tuple2<Integer, Integer> t) throws Exception { - outEdge.copyVerticesFromTuple2(t); - return outEdge; - } - } - - /** Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second. */ - private static class EdgeByIdProjector implements MapFunction<Edge, Edge> { - - @Override - public Edge map(Edge inEdge) throws Exception { - - // flip vertices if necessary - if(inEdge.getFirstVertex() > inEdge.getSecondVertex()) { - inEdge.flipVertices(); - } - - return inEdge; - } - } - - /** - * Builds triads (triples of vertices) from pairs of edges that share a vertex. - * The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId. - * Assumes that input edges share the first vertex and are in ascending order of the second vertex. - */ - @ForwardedFields("0") - private static class TriadBuilder implements GroupReduceFunction<Edge, Triad> { - private final List<Integer> vertices = new ArrayList<Integer>(); - private final Triad outTriad = new Triad(); - - @Override - public void reduce(Iterable<Edge> edgesIter, Collector<Triad> out) throws Exception { - - final Iterator<Edge> edges = edgesIter.iterator(); - - // clear vertex list - vertices.clear(); - - // read first edge - Edge firstEdge = edges.next(); - outTriad.setFirstVertex(firstEdge.getFirstVertex()); - vertices.add(firstEdge.getSecondVertex()); - - // build and emit triads - while (edges.hasNext()) { - Integer higherVertexId = edges.next().getSecondVertex(); - - // combine vertex with all previously read vertices - for (Integer lowerVertexId : vertices) { - outTriad.setSecondVertex(lowerVertexId); - outTriad.setThirdVertex(higherVertexId); - out.collect(outTriad); - } - vertices.add(higherVertexId); - } - } - } - - /** Filters triads (three vertices connected by two edges) without a closing third edge. */ - private static class TriadFilter implements JoinFunction<Triad, Edge, Triad> { - - @Override - public Triad join(Triad triad, Edge edge) throws Exception { - return triad; - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean parseParameters(String[] args) { - - if(args.length > 0) { - // parse input arguments - fileOutput = true; - if(args.length == 2) { - edgePath = args[0]; - outputPath = args[1]; - } else { - System.err.println("Usage: EnumTriangleBasic <edge path> <result path>"); - return false; - } - } else { - System.out.println("Executing Enum Triangles Basic example with built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println(" Usage: EnumTriangleBasic <edge path> <result path>"); - } - return true; - } - - private static DataSet<Edge> getEdgeDataSet(ExecutionEnvironment env) { - if(fileOutput) { - return env.readCsvFile(edgePath) - .fieldDelimiter(" ") - .includeFields(true, true) - .types(Integer.class, Integer.class) - .map(new TupleEdgeConverter()); - } else { - return EnumTrianglesData.getDefaultEdgeDataSet(env); - } - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java deleted file mode 100644 index 3937161..0000000 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/EnumTrianglesOpt.java +++ /dev/null @@ -1,358 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.examples.java.graph; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.operators.Order; -import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.util.Collector; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.examples.java.graph.util.EnumTrianglesData; -import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge; -import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.EdgeWithDegrees; -import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Triad; - -/** - * Triangle enumeration is a pre-processing step to find closely connected parts in graphs. - * A triangle consists of three edges that connect three vertices with each other. - * - * <p> - * The basic algorithm works as follows: - * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices - * that are connected by two edges. Finally, all triads are filtered for which no third edge exists - * that closes the triangle. - * - * <p> - * For a group of <i>n</i> edges that share a common vertex, the number of built triads is quadratic <i>((n*(n-1))/2)</i>. - * Therefore, an optimization of the algorithm is to group edges on the vertex with the smaller output degree to - * reduce the number of triads. - * This implementation extends the basic algorithm by computing output degrees of edge vertices and - * grouping on edges on the vertex with the smaller degree. - * - * <p> - * Input files are plain text files and must be formatted as follows: - * <ul> - * <li>Edges are represented as pairs for vertex IDs which are separated by space - * characters. Edges are separated by new-line characters.<br> - * For example <code>"1 2\n2 12\n1 12\n42 63"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63) - * that include a triangle - * </ul> - * <pre> - * (1) - * / \ - * (2)-(12) - * </pre> - * - * Usage: <code>EnumTriangleOpt <edge path> <result path></code><br> - * If no parameters are provided, the program is run with default data from {@link EnumTrianglesData}. - * - * <p> - * This example shows how to use: - * <ul> - * <li>Custom Java objects which extend Tuple - * <li>Group Sorting - * </ul> - * - */ -@SuppressWarnings("serial") -public class EnumTrianglesOpt { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - if(!parseParameters(args)) { - return; - } - - // set up execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - // read input data - DataSet<Edge> edges = getEdgeDataSet(env); - - // annotate edges with degrees - DataSet<EdgeWithDegrees> edgesWithDegrees = edges - .flatMap(new EdgeDuplicator()) - .groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new DegreeCounter()) - .groupBy(EdgeWithDegrees.V1,EdgeWithDegrees.V2).reduce(new DegreeJoiner()); - - // project edges by degrees - DataSet<Edge> edgesByDegree = edgesWithDegrees - .map(new EdgeByDegreeProjector()); - // project edges by vertex id - DataSet<Edge> edgesById = edgesByDegree - .map(new EdgeByIdProjector()); - - DataSet<Triad> triangles = edgesByDegree - // build triads - .groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new TriadBuilder()) - // filter triads - .join(edgesById).where(Triad.V2,Triad.V3).equalTo(Edge.V1,Edge.V2).with(new TriadFilter()); - - // emit result - if(fileOutput) { - triangles.writeAsCsv(outputPath, "\n", ","); - // execute program - env.execute("Triangle Enumeration Example"); - } else { - triangles.print(); - } - - - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** Converts a Tuple2 into an Edge */ - @ForwardedFields("0;1") - public static class TupleEdgeConverter implements MapFunction<Tuple2<Integer, Integer>, Edge> { - private final Edge outEdge = new Edge(); - - @Override - public Edge map(Tuple2<Integer, Integer> t) throws Exception { - outEdge.copyVerticesFromTuple2(t); - return outEdge; - } - } - - /** Emits for an edge the original edge and its switched version. */ - private static class EdgeDuplicator implements FlatMapFunction<Edge, Edge> { - - @Override - public void flatMap(Edge edge, Collector<Edge> out) throws Exception { - out.collect(edge); - edge.flipVertices(); - out.collect(edge); - } - } - - /** - * Counts the number of edges that share a common vertex. - * Emits one edge for each input edge with a degree annotation for the shared vertex. - * For each emitted edge, the first vertex is the vertex with the smaller id. - */ - private static class DegreeCounter implements GroupReduceFunction<Edge, EdgeWithDegrees> { - - final ArrayList<Integer> otherVertices = new ArrayList<Integer>(); - final EdgeWithDegrees outputEdge = new EdgeWithDegrees(); - - @Override - public void reduce(Iterable<Edge> edgesIter, Collector<EdgeWithDegrees> out) { - - Iterator<Edge> edges = edgesIter.iterator(); - otherVertices.clear(); - - // get first edge - Edge edge = edges.next(); - Integer groupVertex = edge.getFirstVertex(); - this.otherVertices.add(edge.getSecondVertex()); - - // get all other edges (assumes edges are sorted by second vertex) - while (edges.hasNext()) { - edge = edges.next(); - Integer otherVertex = edge.getSecondVertex(); - // collect unique vertices - if(!otherVertices.contains(otherVertex) && otherVertex != groupVertex) { - this.otherVertices.add(otherVertex); - } - } - int degree = this.otherVertices.size(); - - // emit edges - for(Integer otherVertex : this.otherVertices) { - if(groupVertex < otherVertex) { - outputEdge.setFirstVertex(groupVertex); - outputEdge.setFirstDegree(degree); - outputEdge.setSecondVertex(otherVertex); - outputEdge.setSecondDegree(0); - } else { - outputEdge.setFirstVertex(otherVertex); - outputEdge.setFirstDegree(0); - outputEdge.setSecondVertex(groupVertex); - outputEdge.setSecondDegree(degree); - } - out.collect(outputEdge); - } - } - } - - /** - * Builds an edge with degree annotation from two edges that have the same vertices and only one - * degree annotation. - */ - @ForwardedFields("0;1") - private static class DegreeJoiner implements ReduceFunction<EdgeWithDegrees> { - private final EdgeWithDegrees outEdge = new EdgeWithDegrees(); - - @Override - public EdgeWithDegrees reduce(EdgeWithDegrees edge1, EdgeWithDegrees edge2) throws Exception { - - // copy first edge - outEdge.copyFrom(edge1); - - // set missing degree - if(edge1.getFirstDegree() == 0 && edge1.getSecondDegree() != 0) { - outEdge.setFirstDegree(edge2.getFirstDegree()); - } else if (edge1.getFirstDegree() != 0 && edge1.getSecondDegree() == 0) { - outEdge.setSecondDegree(edge2.getSecondDegree()); - } - return outEdge; - } - } - - /** Projects an edge (pair of vertices) such that the first vertex is the vertex with the smaller degree. */ - private static class EdgeByDegreeProjector implements MapFunction<EdgeWithDegrees, Edge> { - - private final Edge outEdge = new Edge(); - - @Override - public Edge map(EdgeWithDegrees inEdge) throws Exception { - - // copy vertices to simple edge - outEdge.copyVerticesFromEdgeWithDegrees(inEdge); - - // flip vertices if first degree is larger than second degree. - if(inEdge.getFirstDegree() > inEdge.getSecondDegree()) { - outEdge.flipVertices(); - } - - // return edge - return outEdge; - } - } - - /** Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second. */ - private static class EdgeByIdProjector implements MapFunction<Edge, Edge> { - - @Override - public Edge map(Edge inEdge) throws Exception { - - // flip vertices if necessary - if(inEdge.getFirstVertex() > inEdge.getSecondVertex()) { - inEdge.flipVertices(); - } - - return inEdge; - } - } - - /** - * Builds triads (triples of vertices) from pairs of edges that share a vertex. - * The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId. - * Assumes that input edges share the first vertex and are in ascending order of the second vertex. - */ - @ForwardedFields("0") - private static class TriadBuilder implements GroupReduceFunction<Edge, Triad> { - - private final List<Integer> vertices = new ArrayList<Integer>(); - private final Triad outTriad = new Triad(); - - @Override - public void reduce(Iterable<Edge> edgesIter, Collector<Triad> out) throws Exception { - final Iterator<Edge> edges = edgesIter.iterator(); - - // clear vertex list - vertices.clear(); - - // read first edge - Edge firstEdge = edges.next(); - outTriad.setFirstVertex(firstEdge.getFirstVertex()); - vertices.add(firstEdge.getSecondVertex()); - - // build and emit triads - while (edges.hasNext()) { - Integer higherVertexId = edges.next().getSecondVertex(); - - // combine vertex with all previously read vertices - for(Integer lowerVertexId : vertices) { - outTriad.setSecondVertex(lowerVertexId); - outTriad.setThirdVertex(higherVertexId); - out.collect(outTriad); - } - vertices.add(higherVertexId); - } - } - } - - /** Filters triads (three vertices connected by two edges) without a closing third edge. */ - private static class TriadFilter implements JoinFunction<Triad, Edge, Triad> { - - @Override - public Triad join(Triad triad, Edge edge) throws Exception { - return triad; - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String edgePath = null; - private static String outputPath = null; - - private static boolean parseParameters(String[] args) { - - if(args.length > 0) { - // parse input arguments - fileOutput = true; - if(args.length == 2) { - edgePath = args[0]; - outputPath = args[1]; - } else { - System.err.println("Usage: EnumTriangleBasic <edge path> <result path>"); - return false; - } - } else { - System.out.println("Executing Enum Triangles Opt example with built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println(" Usage: EnumTriangleOpt <edge path> <result path>"); - } - return true; - } - - private static DataSet<Edge> getEdgeDataSet(ExecutionEnvironment env) { - if(fileOutput) { - return env.readCsvFile(edgePath) - .fieldDelimiter(" ") - .includeFields(true, true) - .types(Integer.class, Integer.class) - .map(new TupleEdgeConverter()); - } else { - return EnumTrianglesData.getDefaultEdgeDataSet(env); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java deleted file mode 100644 index 7b05158..0000000 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java +++ /dev/null @@ -1,288 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.examples.java.graph; - -import static org.apache.flink.api.java.aggregation.Aggregations.SUM; - -import java.util.ArrayList; - -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.util.Collector; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.operators.IterativeDataSet; -import org.apache.flink.examples.java.graph.util.PageRankData; - -/** - * A basic implementation of the Page Rank algorithm using a bulk iteration. - * - * <p> - * This implementation requires a set of pages and a set of directed links as input and works as follows. <br> - * In each iteration, the rank of every page is evenly distributed to all pages it points to. - * Each page collects the partial ranks of all pages that point to it, sums them up, and applies a dampening factor to the sum. - * The result is the new rank of the page. A new iteration is started with the new ranks of all pages. - * This implementation terminates after a fixed number of iterations.<br> - * This is the Wikipedia entry for the <a href="http://en.wikipedia.org/wiki/Page_rank">Page Rank algorithm</a>. - * - * <p> - * Input files are plain text files and must be formatted as follows: - * <ul> - * <li>Pages represented as an (long) ID separated by new-line characters.<br> - * For example <code>"1\n2\n12\n42\n63"</code> gives five pages with IDs 1, 2, 12, 42, and 63. - * <li>Links are represented as pairs of page IDs which are separated by space - * characters. Links are separated by new-line characters.<br> - * For example <code>"1 2\n2 12\n1 12\n42 63"</code> gives four (directed) links (1)->(2), (2)->(12), (1)->(12), and (42)->(63).<br> - * For this simple implementation it is required that each page has at least one incoming and one outgoing link (a page can point to itself). - * </ul> - * - * <p> - * Usage: <code>PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations></code><br> - * If no parameters are provided, the program is run with default data from {@link PageRankData} and 10 iterations. - * - * <p> - * This example shows how to use: - * <ul> - * <li>Bulk Iterations - * <li>Default Join - * <li>Configure user-defined functions using constructor parameters. - * </ul> - * - * - */ -@SuppressWarnings("serial") -public class PageRankBasic { - - private static final double DAMPENING_FACTOR = 0.85; - private static final double EPSILON = 0.0001; - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - if(!parseParameters(args)) { - return; - } - - // set up execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - // get input data - DataSet<Long> pagesInput = getPagesDataSet(env); - DataSet<Tuple2<Long, Long>> linksInput = getLinksDataSet(env); - - // assign initial rank to pages - DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput. - map(new RankAssigner((1.0d / numPages))); - - // build adjacency list from link input - DataSet<Tuple2<Long, Long[]>> adjacencyListInput = - linksInput.groupBy(0).reduceGroup(new BuildOutgoingEdgeList()); - - // set iterative data set - IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations); - - DataSet<Tuple2<Long, Double>> newRanks = iteration - // join pages with outgoing edges and distribute rank - .join(adjacencyListInput).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch()) - // collect and sum ranks - .groupBy(0).aggregate(SUM, 1) - // apply dampening factor - .map(new Dampener(DAMPENING_FACTOR, numPages)); - - DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith( - newRanks, - newRanks.join(iteration).where(0).equalTo(0) - // termination condition - .filter(new EpsilonFilter())); - - // emit result - if(fileOutput) { - finalPageRanks.writeAsCsv(outputPath, "\n", " "); - // execute program - env.execute("Basic Page Rank Example"); - } else { - finalPageRanks.print(); - } - - - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** - * A map function that assigns an initial rank to all pages. - */ - public static final class RankAssigner implements MapFunction<Long, Tuple2<Long, Double>> { - Tuple2<Long, Double> outPageWithRank; - - public RankAssigner(double rank) { - this.outPageWithRank = new Tuple2<Long, Double>(-1l, rank); - } - - @Override - public Tuple2<Long, Double> map(Long page) { - outPageWithRank.f0 = page; - return outPageWithRank; - } - } - - /** - * A reduce function that takes a sequence of edges and builds the adjacency list for the vertex where the edges - * originate. Run as a pre-processing step. - */ - @ForwardedFields("0") - public static final class BuildOutgoingEdgeList implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> { - - private final ArrayList<Long> neighbors = new ArrayList<Long>(); - - @Override - public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) { - neighbors.clear(); - Long id = 0L; - - for (Tuple2<Long, Long> n : values) { - id = n.f0; - neighbors.add(n.f1); - } - out.collect(new Tuple2<Long, Long[]>(id, neighbors.toArray(new Long[neighbors.size()]))); - } - } - - /** - * Join function that distributes a fraction of a vertex's rank to all neighbors. - */ - public static final class JoinVertexWithEdgesMatch implements FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, Tuple2<Long, Double>> { - - @Override - public void flatMap(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value, Collector<Tuple2<Long, Double>> out){ - Long[] neigbors = value.f1.f1; - double rank = value.f0.f1; - double rankToDistribute = rank / ((double) neigbors.length); - - for (int i = 0; i < neigbors.length; i++) { - out.collect(new Tuple2<Long, Double>(neigbors[i], rankToDistribute)); - } - } - } - - /** - * The function that applies the page rank dampening formula - */ - @ForwardedFields("0") - public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> { - - private final double dampening; - private final double randomJump; - - public Dampener(double dampening, double numVertices) { - this.dampening = dampening; - this.randomJump = (1 - dampening) / numVertices; - } - - @Override - public Tuple2<Long, Double> map(Tuple2<Long, Double> value) { - value.f1 = (value.f1 * dampening) + randomJump; - return value; - } - } - - /** - * Filter that filters vertices where the rank difference is below a threshold. - */ - public static final class EpsilonFilter implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> { - - @Override - public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) { - return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON; - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String pagesInputPath = null; - private static String linksInputPath = null; - private static String outputPath = null; - private static long numPages = 0; - private static int maxIterations = 10; - - private static boolean parseParameters(String[] args) { - - if(args.length > 0) { - if(args.length == 5) { - fileOutput = true; - pagesInputPath = args[0]; - linksInputPath = args[1]; - outputPath = args[2]; - numPages = Integer.parseInt(args[3]); - maxIterations = Integer.parseInt(args[4]); - } else { - System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>"); - return false; - } - } else { - System.out.println("Executing PageRank Basic example with default parameters and built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println(" Usage: PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>"); - - numPages = PageRankData.getNumberOfPages(); - } - return true; - } - - private static DataSet<Long> getPagesDataSet(ExecutionEnvironment env) { - if(fileOutput) { - return env - .readCsvFile(pagesInputPath) - .fieldDelimiter(" ") - .lineDelimiter("\n") - .types(Long.class) - .map(new MapFunction<Tuple1<Long>, Long>() { - @Override - public Long map(Tuple1<Long> v) { return v.f0; } - }); - } else { - return PageRankData.getDefaultPagesDataSet(env); - } - } - - private static DataSet<Tuple2<Long, Long>> getLinksDataSet(ExecutionEnvironment env) { - if(fileOutput) { - return env.readCsvFile(linksInputPath) - .fieldDelimiter(" ") - .lineDelimiter("\n") - .types(Long.class, Long.class); - } else { - return PageRankData.getDefaultEdgeDataSet(env); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java deleted file mode 100644 index 5306895..0000000 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/TransitiveClosureNaive.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.examples.java.graph; - -import org.apache.flink.api.common.functions.CoGroupFunction; -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.operators.IterativeDataSet; -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.examples.java.graph.util.ConnectedComponentsData; -import org.apache.flink.util.Collector; - -import java.util.HashSet; -import java.util.Set; - -@SuppressWarnings("serial") -public class TransitiveClosureNaive implements ProgramDescription { - - - public static void main (String... args) throws Exception{ - - if (!parseParameters(args)) { - return; - } - - // set up execution environment - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env); - - IterativeDataSet<Tuple2<Long,Long>> paths = edges.iterate(maxIterations); - - DataSet<Tuple2<Long,Long>> nextPaths = paths - .join(edges) - .where(1) - .equalTo(0) - .with(new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() { - @Override - /** - left: Path (z,x) - x is reachable by z - right: Edge (x,y) - edge x-->y exists - out: Path (z,y) - y is reachable by z - */ - public Tuple2<Long, Long> join(Tuple2<Long, Long> left, Tuple2<Long, Long> right) throws Exception { - return new Tuple2<Long, Long>(left.f0, right.f1); - } - }).withForwardedFieldsFirst("0").withForwardedFieldsSecond("1") - .union(paths) - .groupBy(0, 1) - .reduceGroup(new GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() { - @Override - public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception { - out.collect(values.iterator().next()); - } - }).withForwardedFields("0;1"); - - DataSet<Tuple2<Long,Long>> newPaths = paths - .coGroup(nextPaths) - .where(0).equalTo(0) - .with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() { - Set<Tuple2<Long,Long>> prevSet = new HashSet<Tuple2<Long,Long>>(); - @Override - public void coGroup(Iterable<Tuple2<Long, Long>> prevPaths, Iterable<Tuple2<Long, Long>> nextPaths, Collector<Tuple2<Long, Long>> out) throws Exception { - for (Tuple2<Long,Long> prev : prevPaths) { - prevSet.add(prev); - } - for (Tuple2<Long,Long> next: nextPaths) { - if (!prevSet.contains(next)) { - out.collect(next); - } - } - } - }).withForwardedFieldsFirst("0").withForwardedFieldsSecond("0"); - - DataSet<Tuple2<Long, Long>> transitiveClosure = paths.closeWith(nextPaths, newPaths); - - - // emit result - if (fileOutput) { - transitiveClosure.writeAsCsv(outputPath, "\n", " "); - - // execute program explicitly, because file sinks are lazy - env.execute("Transitive Closure Example"); - } else { - transitiveClosure.print(); - } - } - - @Override - public String getDescription() { - return "Parameters: <edges-path> <result-path> <max-number-of-iterations>"; - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String edgesPath = null; - private static String outputPath = null; - private static int maxIterations = 10; - - private static boolean parseParameters(String[] programArguments) { - - if (programArguments.length > 0) { - // parse input arguments - fileOutput = true; - if (programArguments.length == 3) { - edgesPath = programArguments[0]; - outputPath = programArguments[1]; - maxIterations = Integer.parseInt(programArguments[2]); - } else { - System.err.println("Usage: TransitiveClosure <edges path> <result path> <max number of iterations>"); - return false; - } - } else { - System.out.println("Executing TransitiveClosure example with default parameters and built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println(" Usage: TransitiveClosure <edges path> <result path> <max number of iterations>"); - } - return true; - } - - - private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) { - - if(fileOutput) { - return env.readCsvFile(edgesPath).fieldDelimiter(" ").types(Long.class, Long.class); - } else { - return ConnectedComponentsData.getDefaultEdgeDataSet(env); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java deleted file mode 100644 index bd68244..0000000 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/ConnectedComponentsData.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.examples.java.graph.util; - -import java.util.LinkedList; -import java.util.List; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; - -/** - * Provides the default data sets used for the Connected Components example program. - * The default data sets are used, if no parameters are given to the program. - * - */ -public class ConnectedComponentsData { - - public static final long[] VERTICES = new long[] { - 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}; - - public static DataSet<Long> getDefaultVertexDataSet(ExecutionEnvironment env) { - List<Long> verticesList = new LinkedList<Long>(); - for (long vertexId : VERTICES) { - verticesList.add(vertexId); - } - return env.fromCollection(verticesList); - } - - public static final Object[][] EDGES = new Object[][] { - new Object[]{1L, 2L}, - new Object[]{2L, 3L}, - new Object[]{2L, 4L}, - new Object[]{3L, 5L}, - new Object[]{6L, 7L}, - new Object[]{8L, 9L}, - new Object[]{8L, 10L}, - new Object[]{5L, 11L}, - new Object[]{11L, 12L}, - new Object[]{10L, 13L}, - new Object[]{9L, 14L}, - new Object[]{13L, 14L}, - new Object[]{1L, 15L}, - new Object[]{16L, 1L} - }; - - public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) { - - List<Tuple2<Long, Long>> edgeList = new LinkedList<Tuple2<Long, Long>>(); - for (Object[] edge : EDGES) { - edgeList.add(new Tuple2<Long, Long>((Long) edge[0], (Long) edge[1])); - } - return env.fromCollection(edgeList); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java deleted file mode 100644 index 331f0a5..0000000 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesData.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.examples.java.graph.util; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge; - -/** - * Provides the default data sets used for the Triangle Enumeration example programs. - * The default data sets are used, if no parameters are given to the program. - * - */ -public class EnumTrianglesData { - - public static final Object[][] EDGES = { - {1, 2}, - {1, 3}, - {1 ,4}, - {1, 5}, - {2, 3}, - {2, 5}, - {3, 4}, - {3, 7}, - {3, 8}, - {5, 6}, - {7, 8} - }; - - public static DataSet<Edge> getDefaultEdgeDataSet(ExecutionEnvironment env) { - - List<Edge> edges = new ArrayList<Edge>(); - for(Object[] e : EDGES) { - edges.add(new Edge((Integer)e[0], (Integer)e[1])); - } - - return env.fromCollection(edges); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java deleted file mode 100644 index acd3f03..0000000 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/EnumTrianglesDataTypes.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.examples.java.graph.util; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple4; - -public class EnumTrianglesDataTypes { - - public static class Edge extends Tuple2<Integer, Integer> { - private static final long serialVersionUID = 1L; - - public static final int V1 = 0; - public static final int V2 = 1; - - public Edge() {} - - public Edge(final Integer v1, final Integer v2) { - this.setFirstVertex(v1); - this.setSecondVertex(v2); - } - - public Integer getFirstVertex() { return this.getField(V1); } - - public Integer getSecondVertex() { return this.getField(V2); } - - public void setFirstVertex(final Integer vertex1) { this.setField(vertex1, V1); } - - public void setSecondVertex(final Integer vertex2) { this.setField(vertex2, V2); } - - public void copyVerticesFromTuple2(Tuple2<Integer, Integer> t) { - this.setFirstVertex(t.f0); - this.setSecondVertex(t.f1); - } - - public void copyVerticesFromEdgeWithDegrees(EdgeWithDegrees ewd) { - this.setFirstVertex(ewd.getFirstVertex()); - this.setSecondVertex(ewd.getSecondVertex()); - } - - public void flipVertices() { - Integer tmp = this.getFirstVertex(); - this.setFirstVertex(this.getSecondVertex()); - this.setSecondVertex(tmp); - } - } - - public static class Triad extends Tuple3<Integer, Integer, Integer> { - private static final long serialVersionUID = 1L; - - public static final int V1 = 0; - public static final int V2 = 1; - public static final int V3 = 2; - - public Triad() {} - - public void setFirstVertex(final Integer vertex1) { this.setField(vertex1, V1); } - - public void setSecondVertex(final Integer vertex2) { this.setField(vertex2, V2); } - - public void setThirdVertex(final Integer vertex3) { this.setField(vertex3, V3); } - } - - public static class EdgeWithDegrees extends Tuple4<Integer, Integer, Integer, Integer> { - private static final long serialVersionUID = 1L; - - public static final int V1 = 0; - public static final int V2 = 1; - public static final int D1 = 2; - public static final int D2 = 3; - - public EdgeWithDegrees() { } - - public Integer getFirstVertex() { return this.getField(V1); } - - public Integer getSecondVertex() { return this.getField(V2); } - - public Integer getFirstDegree() { return this.getField(D1); } - - public Integer getSecondDegree() { return this.getField(D2); } - - public void setFirstVertex(final Integer vertex1) { this.setField(vertex1, V1); } - - public void setSecondVertex(final Integer vertex2) { this.setField(vertex2, V2); } - - public void setFirstDegree(final Integer degree1) { this.setField(degree1, D1); } - - public void setSecondDegree(final Integer degree2) { this.setField(degree2, D2); } - - public void copyFrom(final EdgeWithDegrees edge) { - this.setFirstVertex(edge.getFirstVertex()); - this.setSecondVertex(edge.getSecondVertex()); - this.setFirstDegree(edge.getFirstDegree()); - this.setSecondDegree(edge.getSecondDegree()); - } - } - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java deleted file mode 100644 index d4e8a80..0000000 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/util/PageRankData.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.examples.java.graph.util; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; - -/** - * Provides the default data sets used for the PageRank example program. - * The default data sets are used, if no parameters are given to the program. - * - */ -public class PageRankData { - - public static final Object[][] EDGES = { - {1L, 2L}, - {1L, 15L}, - {2L, 3L}, - {2L, 4L}, - {2L, 5L}, - {2L, 6L}, - {2L, 7L}, - {3L, 13L}, - {4L, 2L}, - {5L, 11L}, - {5L, 12L}, - {6L, 1L}, - {6L, 7L}, - {6L, 8L}, - {7L, 1L}, - {7L, 8L}, - {8L, 1L}, - {8L, 9L}, - {8L, 10L}, - {9L, 14L}, - {9L, 1L}, - {10L, 1L}, - {10L, 13L}, - {11L, 12L}, - {11L, 1L}, - {12L, 1L}, - {13L, 14L}, - {14L, 12L}, - {15L, 1L}, - }; - - private static long numPages = 15; - - public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) { - - List<Tuple2<Long, Long>> edges = new ArrayList<Tuple2<Long, Long>>(); - for(Object[] e : EDGES) { - edges.add(new Tuple2<Long, Long>((Long)e[0], (Long)e[1])); - } - return env.fromCollection(edges); - } - - public static DataSet<Long> getDefaultPagesDataSet(ExecutionEnvironment env) { - return env.generateSequence(1, 15); - } - - public static long getNumberOfPages() { - return numPages; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java deleted file mode 100644 index e5b94e9..0000000 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/CollectionExecutionExample.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.examples.java.misc; - -import java.util.List; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; - -/** - * This example shows how to use the collection based execution of Flink. - * - * The collection based execution is a local mode that is not using the full Flink runtime. - * DataSet transformations are executed on Java collections. - * - * See the "Local Execution" section in the documentation for more details: - * http://flink.apache.org/docs/latest/apis/local_execution.html - * - */ -public class CollectionExecutionExample { - - /** - * POJO class representing a user - */ - public static class User { - public int userIdentifier; - public String name; - public User() {} - public User(int userIdentifier, String name) { - this.userIdentifier = userIdentifier; this.name = name; - } - public String toString() { - return "User{userIdentifier="+userIdentifier+" name="+name+"}"; - } - } - - /** - * POJO for an EMail. - */ - public static class EMail { - public int userId; - public String subject; - public String body; - public EMail() {} - public EMail(int userId, String subject, String body) { - this.userId = userId; this.subject = subject; this.body = body; - } - public String toString() { - return "eMail{userId="+userId+" subject="+subject+" body="+body+"}"; - } - - } - public static void main(String[] args) throws Exception { - // initialize a new Collection-based execution environment - final ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); - - // create objects for users and emails - User[] usersArray = { new User(1, "Peter"), new User(2, "John"), new User(3, "Bill") }; - - EMail[] emailsArray = {new EMail(1, "Re: Meeting", "How about 1pm?"), - new EMail(1, "Re: Meeting", "Sorry, I'm not availble"), - new EMail(3, "Re: Re: Project proposal", "Give me a few more days to think about it.")}; - - // convert objects into a DataSet - DataSet<User> users = env.fromElements(usersArray); - DataSet<EMail> emails = env.fromElements(emailsArray); - - // join the two DataSets - DataSet<Tuple2<User,EMail>> joined = users.join(emails).where("userIdentifier").equalTo("userId"); - - // retrieve the resulting Tuple2 elements into a ArrayList. - List<Tuple2<User,EMail>> result = joined.collect(); - - // Do some work with the resulting ArrayList (=Collection). - for(Tuple2<User, EMail> t : result) { - System.err.println("Result = " + t); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java deleted file mode 100644 index fc85110..0000000 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/PiEstimation.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.examples.java.misc; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; - -/** - * Estimates the value of Pi using the Monte Carlo method. - * The area of a circle is Pi * R^2, R being the radius of the circle - * The area of a square is 4 * R^2, where the length of the square's edge is 2*R. - * - * Thus Pi = 4 * (area of circle / area of square). - * - * The idea is to find a way to estimate the circle to square area ratio. - * The Monte Carlo method suggests collecting random points (within the square) - * and then counting the number of points that fall within the circle - * - * <pre> - * {@code - * x = Math.random() - * y = Math.random() - * - * x * x + y * y < 1 - * } - * </pre> - */ -@SuppressWarnings("serial") -public class PiEstimation implements java.io.Serializable { - - - public static void main(String[] args) throws Exception { - - final long numSamples = args.length > 0 ? Long.parseLong(args[0]) : 1000000; - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - // count how many of the samples would randomly fall into - // the unit circle - DataSet<Long> count = - env.generateSequence(1, numSamples) - .map(new Sampler()) - .reduce(new SumReducer()); - - long theCount = count.collect().get(0); - - System.out.println("We estimate Pi to be: " + (theCount * 4.0 / numSamples)); - } - - //************************************************************************* - // USER FUNCTIONS - //************************************************************************* - - - /** - * Sampler randomly emits points that fall within a square of edge x * y. - * It calculates the distance to the center of a virtually centered circle of radius x = y = 1 - * If the distance is less than 1, then and only then does it returns a 1. - */ - public static class Sampler implements MapFunction<Long, Long> { - - @Override - public Long map(Long value) { - double x = Math.random(); - double y = Math.random(); - return (x * x + y * y) < 1 ? 1L : 0L; - } - } - - - /** - * Simply sums up all long values. - */ - public static final class SumReducer implements ReduceFunction<Long>{ - - @Override - public Long reduce(Long value1, Long value2) { - return value1 + value2; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java deleted file mode 100644 index 9c3356c..0000000 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java +++ /dev/null @@ -1,317 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.examples.java.ml; - -import java.io.Serializable; -import java.util.Collection; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.examples.java.ml.util.LinearRegressionData; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.operators.IterativeDataSet; - -/** - * This example implements a basic Linear Regression to solve the y = theta0 + theta1*x problem using batch gradient descent algorithm. - * - * <p> - * Linear Regression with BGD(batch gradient descent) algorithm is an iterative clustering algorithm and works as follows:<br> - * Giving a data set and target set, the BGD try to find out the best parameters for the data set to fit the target set. - * In each iteration, the algorithm computes the gradient of the cost function and use it to update all the parameters. - * The algorithm terminates after a fixed number of iterations (as in this implementation) - * With enough iteration, the algorithm can minimize the cost function and find the best parameters - * This is the Wikipedia entry for the <a href = "http://en.wikipedia.org/wiki/Linear_regression">Linear regression</a> and <a href = "http://en.wikipedia.org/wiki/Gradient_descent">Gradient descent algorithm</a>. - * - * <p> - * This implementation works on one-dimensional data. And find the two-dimensional theta.<br> - * It find the best Theta parameter to fit the target. - * - * <p> - * Input files are plain text files and must be formatted as follows: - * <ul> - * <li>Data points are represented as two double values separated by a blank character. The first one represent the X(the training data) and the second represent the Y(target). - * Data points are separated by newline characters.<br> - * For example <code>"-0.02 -0.04\n5.3 10.6\n"</code> gives two data points (x=-0.02, y=-0.04) and (x=5.3, y=10.6). - * </ul> - * - * <p> - * This example shows how to use: - * <ul> - * <li> Bulk iterations - * <li> Broadcast variables in bulk iterations - * <li> Custom Java objects (PoJos) - * </ul> - */ -@SuppressWarnings("serial") -public class LinearRegression { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception{ - - if(!parseParameters(args)) { - return; - } - - // set up execution environment - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - // get input x data from elements - DataSet<Data> data = getDataSet(env); - - // get the parameters from elements - DataSet<Params> parameters = getParamsDataSet(env); - - // set number of bulk iterations for SGD linear Regression - IterativeDataSet<Params> loop = parameters.iterate(numIterations); - - DataSet<Params> new_parameters = data - // compute a single step using every sample - .map(new SubUpdate()).withBroadcastSet(loop, "parameters") - // sum up all the steps - .reduce(new UpdateAccumulator()) - // average the steps and update all parameters - .map(new Update()); - - // feed new parameters back into next iteration - DataSet<Params> result = loop.closeWith(new_parameters); - - // emit result - if(fileOutput) { - result.writeAsText(outputPath); - // execute program - env.execute("Linear Regression example"); - } else { - result.print(); - } - } - - // ************************************************************************* - // DATA TYPES - // ************************************************************************* - - /** - * A simple data sample, x means the input, and y means the target. - */ - public static class Data implements Serializable{ - public double x,y; - - public Data() {}; - - public Data(double x ,double y){ - this.x = x; - this.y = y; - } - - @Override - public String toString() { - return "(" + x + "|" + y + ")"; - } - - } - - /** - * A set of parameters -- theta0, theta1. - */ - public static class Params implements Serializable{ - - private double theta0,theta1; - - public Params(){}; - - public Params(double x0, double x1){ - this.theta0 = x0; - this.theta1 = x1; - } - - @Override - public String toString() { - return theta0 + " " + theta1; - } - - public double getTheta0() { - return theta0; - } - - public double getTheta1() { - return theta1; - } - - public void setTheta0(double theta0) { - this.theta0 = theta0; - } - - public void setTheta1(double theta1) { - this.theta1 = theta1; - } - - public Params div(Integer a){ - this.theta0 = theta0 / a ; - this.theta1 = theta1 / a ; - return this; - } - - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** Converts a {@code Tuple2<Double,Double>} into a Data. */ - @ForwardedFields("0->x; 1->y") - public static final class TupleDataConverter implements MapFunction<Tuple2<Double, Double>, Data> { - - @Override - public Data map(Tuple2<Double, Double> t) throws Exception { - return new Data(t.f0, t.f1); - } - } - - /** Converts a {@code Tuple2<Double,Double>} into a Params. */ - @ForwardedFields("0->theta0; 1->theta1") - public static final class TupleParamsConverter implements MapFunction<Tuple2<Double, Double>,Params> { - - @Override - public Params map(Tuple2<Double, Double> t)throws Exception { - return new Params(t.f0,t.f1); - } - } - - /** - * Compute a single BGD type update for every parameters. - */ - public static class SubUpdate extends RichMapFunction<Data,Tuple2<Params,Integer>> { - - private Collection<Params> parameters; - - private Params parameter; - - private int count = 1; - - /** Reads the parameters from a broadcast variable into a collection. */ - @Override - public void open(Configuration parameters) throws Exception { - this.parameters = getRuntimeContext().getBroadcastVariable("parameters"); - } - - @Override - public Tuple2<Params, Integer> map(Data in) throws Exception { - - for(Params p : parameters){ - this.parameter = p; - } - - double theta_0 = parameter.theta0 - 0.01*((parameter.theta0 + (parameter.theta1*in.x)) - in.y); - double theta_1 = parameter.theta1 - 0.01*(((parameter.theta0 + (parameter.theta1*in.x)) - in.y) * in.x); - - return new Tuple2<Params,Integer>(new Params(theta_0,theta_1),count); - } - } - - /** - * Accumulator all the update. - * */ - public static class UpdateAccumulator implements ReduceFunction<Tuple2<Params, Integer>> { - - @Override - public Tuple2<Params, Integer> reduce(Tuple2<Params, Integer> val1, Tuple2<Params, Integer> val2) { - - double new_theta0 = val1.f0.theta0 + val2.f0.theta0; - double new_theta1 = val1.f0.theta1 + val2.f0.theta1; - Params result = new Params(new_theta0,new_theta1); - return new Tuple2<Params, Integer>( result, val1.f1 + val2.f1); - - } - } - - /** - * Compute the final update by average them. - */ - public static class Update implements MapFunction<Tuple2<Params, Integer>,Params> { - - @Override - public Params map(Tuple2<Params, Integer> arg0) throws Exception { - - return arg0.f0.div(arg0.f1); - - } - - } - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String dataPath = null; - private static String outputPath = null; - private static int numIterations = 10; - - private static boolean parseParameters(String[] programArguments) { - - if(programArguments.length > 0) { - // parse input arguments - fileOutput = true; - if(programArguments.length == 3) { - dataPath = programArguments[0]; - outputPath = programArguments[1]; - numIterations = Integer.parseInt(programArguments[2]); - } else { - System.err.println("Usage: LinearRegression <data path> <result path> <num iterations>"); - return false; - } - } else { - System.out.println("Executing Linear Regression example with default parameters and built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println(" We provide a data generator to create synthetic input files for this program."); - System.out.println(" Usage: LinearRegression <data path> <result path> <num iterations>"); - } - return true; - } - - private static DataSet<Data> getDataSet(ExecutionEnvironment env) { - if(fileOutput) { - // read data from CSV file - return env.readCsvFile(dataPath) - .fieldDelimiter(" ") - .includeFields(true, true) - .types(Double.class, Double.class) - .map(new TupleDataConverter()); - } else { - return LinearRegressionData.getDefaultDataDataSet(env); - } - } - - private static DataSet<Params> getParamsDataSet(ExecutionEnvironment env) { - - return LinearRegressionData.getDefaultParamsDataSet(env); - - } - -} - http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java deleted file mode 100644 index 006b8b5..0000000 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionData.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.examples.java.ml.util; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.examples.java.ml.LinearRegression.Data; -import org.apache.flink.examples.java.ml.LinearRegression.Params; - -import java.util.LinkedList; -import java.util.List; - -/** - * Provides the default data sets used for the Linear Regression example - * program. The default data sets are used, if no parameters are given to the - * program. - */ -public class LinearRegressionData { - - // We have the data as object arrays so that we can also generate Scala Data - // Sources from it. - public static final Object[][] PARAMS = new Object[][] { new Object[] { - 0.0, 0.0 } }; - - public static final Object[][] DATA = new Object[][] { - new Object[] { 0.5, 1.0 }, new Object[] { 1.0, 2.0 }, - new Object[] { 2.0, 4.0 }, new Object[] { 3.0, 6.0 }, - new Object[] { 4.0, 8.0 }, new Object[] { 5.0, 10.0 }, - new Object[] { 6.0, 12.0 }, new Object[] { 7.0, 14.0 }, - new Object[] { 8.0, 16.0 }, new Object[] { 9.0, 18.0 }, - new Object[] { 10.0, 20.0 }, new Object[] { -0.08, -0.16 }, - new Object[] { 0.13, 0.26 }, new Object[] { -1.17, -2.35 }, - new Object[] { 1.72, 3.45 }, new Object[] { 1.70, 3.41 }, - new Object[] { 1.20, 2.41 }, new Object[] { -0.59, -1.18 }, - new Object[] { 0.28, 0.57 }, new Object[] { 1.65, 3.30 }, - new Object[] { -0.55, -1.08 } }; - - public static DataSet<Params> getDefaultParamsDataSet( - ExecutionEnvironment env) { - List<Params> paramsList = new LinkedList<Params>(); - for (Object[] params : PARAMS) { - paramsList.add(new Params((Double) params[0], (Double) params[1])); - } - return env.fromCollection(paramsList); - } - - public static DataSet<Data> getDefaultDataDataSet(ExecutionEnvironment env) { - - List<Data> dataList = new LinkedList<Data>(); - for (Object[] data : DATA) { - dataList.add(new Data((Double) data[0], (Double) data[1])); - } - return env.fromCollection(dataList); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java deleted file mode 100644 index d95467d..0000000 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.examples.java.ml.util; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.text.DecimalFormat; -import java.util.Locale; -import java.util.Random; - -/** - * Generates data for the {@link org.apache.flink.examples.java.ml.LinearRegression} example program. - */ -public class LinearRegressionDataGenerator { - - static { - Locale.setDefault(Locale.US); - } - - private static final String POINTS_FILE = "data"; - private static final long DEFAULT_SEED = 4650285087650871364L; - private static final int DIMENSIONALITY = 1; - private static final DecimalFormat FORMAT = new DecimalFormat("#0.00"); - private static final char DELIMITER = ' '; - - /** - * Main method to generate data for the {@link org.apache.flink.examples.java.ml.LinearRegression} example program. - * <p> - * The generator creates to files: - * <ul> - * <li><code>{tmp.dir}/data</code> for the data points - * </ul> - * - * @param args - * <ol> - * <li>Int: Number of data points - * <li><b>Optional</b> Long: Random seed - * </ol> - */ - public static void main(String[] args) throws IOException { - - // check parameter count - if (args.length < 1) { - System.out.println("LinearRegressionDataGenerator <numberOfDataPoints> [<seed>]"); - System.exit(1); - } - - // parse parameters - final int numDataPoints = Integer.parseInt(args[0]); - final long firstSeed = args.length > 1 ? Long.parseLong(args[4]) : DEFAULT_SEED; - final Random random = new Random(firstSeed); - final String tmpDir = System.getProperty("java.io.tmpdir"); - - // write the points out - BufferedWriter pointsOut = null; - try { - pointsOut = new BufferedWriter(new FileWriter(new File(tmpDir+"/"+POINTS_FILE))); - StringBuilder buffer = new StringBuilder(); - - // DIMENSIONALITY + 1 means that the number of x(dimensionality) and target y - double[] point = new double[DIMENSIONALITY+1]; - - for (int i = 1; i <= numDataPoints; i++) { - point[0] = random.nextGaussian(); - point[1] = 2 * point[0] + 0.01*random.nextGaussian(); - writePoint(point, buffer, pointsOut); - } - - } - finally { - if (pointsOut != null) { - pointsOut.close(); - } - } - - System.out.println("Wrote "+numDataPoints+" data points to "+tmpDir+"/"+POINTS_FILE); - } - - - private static void writePoint(double[] data, StringBuilder buffer, BufferedWriter out) throws IOException { - buffer.setLength(0); - - // write coordinates - for (int j = 0; j < data.length; j++) { - buffer.append(FORMAT.format(data[j])); - if(j < data.length - 1) { - buffer.append(DELIMITER); - } - } - - out.write(buffer.toString()); - out.newLine(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java deleted file mode 100644 index 6813c62..0000000 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.examples.java.relational; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.common.functions.RichFilterFunction; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.Configuration; - -/** - * This program filters lines from a CSV file with empty fields. In doing so, it counts the number of empty fields per - * column within a CSV file using a custom accumulator for vectors. In this context, empty fields are those, that at - * most contain whitespace characters like space and tab. - * <p> - * The input file is a plain text CSV file with the semicolon as field separator and double quotes as field delimiters - * and three columns. See {@link #getDataSet(ExecutionEnvironment)} for configuration. - * <p> - * Usage: <code>FilterAndCountIncompleteLines [<input file path> [<result path>]]</code> <br> - * <p> - * This example shows how to use: - * <ul> - * <li>custom accumulators - * <li>tuple data types - * <li>inline-defined functions - * </ul> - */ -@SuppressWarnings("serial") -public class EmptyFieldsCountAccumulator { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - private static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields"; - - public static void main(final String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - // get the data set - final DataSet<Tuple> file = getDataSet(env); - - // filter lines with empty fields - final DataSet<Tuple> filteredLines = file.filter(new EmptyFieldFilter()); - - // Here, we could do further processing with the filtered lines... - JobExecutionResult result; - // output the filtered lines - if (outputPath == null) { - filteredLines.print(); - result = env.getLastJobExecutionResult(); - } else { - filteredLines.writeAsCsv(outputPath); - // execute program - result = env.execute("Accumulator example"); - } - - // get the accumulator result via its registration key - final List<Integer> emptyFields = result.getAccumulatorResult(EMPTY_FIELD_ACCUMULATOR); - System.out.format("Number of detected empty fields per column: %s\n", emptyFields); - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static String filePath; - private static String outputPath; - - private static boolean parseParameters(final String[] programArguments) { - - if (programArguments.length >= 3) { - System.err.println("Usage: FilterAndCountIncompleteLines [<input file path> [<result path>]]"); - return false; - } - - if (programArguments.length >= 1) { - filePath = programArguments[0]; - if (programArguments.length == 2) { - outputPath = programArguments[1]; - } - } - - return true; - } - - @SuppressWarnings("unchecked") - private static DataSet<Tuple> getDataSet(final ExecutionEnvironment env) { - - DataSet<? extends Tuple> source; - if (filePath == null) { - source = env.fromCollection(getExampleInputTuples()); - - } else { - source = env - .readCsvFile(filePath) - .fieldDelimiter(";") - .types(String.class, String.class, String.class); - - } - - return (DataSet<Tuple>) source; - } - - private static Collection<Tuple3<String, String, String>> getExampleInputTuples() { - Collection<Tuple3<String, String, String>> inputTuples = new ArrayList<Tuple3<String, String, String>>(); - inputTuples.add(new Tuple3<String, String, String>("John", "Doe", "Foo Str.")); - inputTuples.add(new Tuple3<String, String, String>("Joe", "Johnson", "")); - inputTuples.add(new Tuple3<String, String, String>(null, "Kate Morn", "Bar Blvd.")); - inputTuples.add(new Tuple3<String, String, String>("Tim", "Rinny", "")); - inputTuples.add(new Tuple3<String, String, String>("Alicia", "Jackson", " ")); - return inputTuples; - } - - /** - * This function filters all incoming tuples that have one or more empty fields. - * In doing so, it also counts the number of empty fields per attribute with an accumulator (registered under - * {@link EmptyFieldsCountAccumulator#EMPTY_FIELD_ACCUMULATOR}). - */ - public static final class EmptyFieldFilter extends RichFilterFunction<Tuple> { - - // create a new accumulator in each filter function instance - // accumulators can be merged later on - private final VectorAccumulator emptyFieldCounter = new VectorAccumulator(); - - @Override - public void open(final Configuration parameters) throws Exception { - super.open(parameters); - - // register the accumulator instance - getRuntimeContext().addAccumulator(EMPTY_FIELD_ACCUMULATOR, - this.emptyFieldCounter); - } - - @Override - public boolean filter(final Tuple t) { - boolean containsEmptyFields = false; - - // iterate over the tuple fields looking for empty ones - for (int pos = 0; pos < t.getArity(); pos++) { - - final String field = t.getField(pos); - if (field == null || field.trim().isEmpty()) { - containsEmptyFields = true; - - // if an empty field is encountered, update the - // accumulator - this.emptyFieldCounter.add(pos); - } - } - - return !containsEmptyFields; - } - } - - /** - * This accumulator maintains a vector of counts. Calling {@link #add(Integer)} increments the - * <i>n</i>-th vector component. The size of the vector is automatically managed. - */ - public static class VectorAccumulator implements Accumulator<Integer, ArrayList<Integer>> { - - /** Stores the accumulated vector components. */ - private final ArrayList<Integer> resultVector; - - public VectorAccumulator(){ - this(new ArrayList<Integer>()); - } - - public VectorAccumulator(ArrayList<Integer> resultVector){ - this.resultVector = resultVector; - } - - /** - * Increases the result vector component at the specified position by 1. - */ - @Override - public void add(Integer position) { - updateResultVector(position, 1); - } - - /** - * Increases the result vector component at the specified position by the specified delta. - */ - private void updateResultVector(int position, int delta) { - // inflate the vector to contain the given position - while (this.resultVector.size() <= position) { - this.resultVector.add(0); - } - - // increment the component value - final int component = this.resultVector.get(position); - this.resultVector.set(position, component + delta); - } - - @Override - public ArrayList<Integer> getLocalValue() { - return this.resultVector; - } - - @Override - public void resetLocal() { - // clear the result vector if the accumulator instance shall be reused - this.resultVector.clear(); - } - - @Override - public void merge(final Accumulator<Integer, ArrayList<Integer>> other) { - // merge two vector accumulators by adding their up their vector components - final List<Integer> otherVector = other.getLocalValue(); - for (int index = 0; index < otherVector.size(); index++) { - updateResultVector(index, otherVector.get(index)); - } - } - - @Override - public Accumulator<Integer, ArrayList<Integer>> clone() { - return new VectorAccumulator(new ArrayList<Integer>(resultVector)); - } - - @Override - public String toString() { - return StringUtils.join(resultVector, ','); - } - } -}