http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCountPojo.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCountPojo.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCountPojo.java new file mode 100644 index 0000000..b0653ca --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCountPojo.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.java.wordcount; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.core.fs.FileSystem.WriteMode; +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.util.Collector; + +/** + * This example shows an implementation of WordCount without using the + * Tuple2 type, but a custom class. + */ +@SuppressWarnings("serial") +public class WordCountPojo { + + /** + * This is the POJO (Plain Old Java Object) that is being used + * for all the operations. + * As long as all fields are public or have a getter/setter, the system can handle them + */ + public static class Word { + + // fields + private String word; + private int frequency; + + // constructors + public Word() {} + + public Word(String word, int i) { + this.word = word; + this.frequency = i; + } + + // getters setters + public String getWord() { + return word; + } + + public void setWord(String word) { + this.word = word; + } + + public int getFrequency() { + return frequency; + } + + public void setFrequency(int frequency) { + this.frequency = frequency; + } + + @Override + public String toString() { + return "Word="+word+" freq="+frequency; + } + } + + public static void main(String[] args) throws Exception { + if (!parseParameters(args)) { + return; + } + + // set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // get input data + DataSet<String> text = getTextDataSet(env); + + DataSet<Word> counts = + // split up the lines into Word objects (with frequency = 1) + text.flatMap(new Tokenizer()) + // group by the field word and sum up the frequency + .groupBy("word") + .reduce(new ReduceFunction<Word>() { + @Override + public Word reduce(Word value1, Word value2) throws Exception { + return new Word(value1.word,value1.frequency + value2.frequency); + } + }); + + if (fileOutput) { + counts.writeAsText(outputPath, WriteMode.OVERWRITE); + // execute program + env.execute("WordCount-Pojo Example"); + } else { + counts.print(); + } + + } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + /** + * Implements the string tokenizer that splits sentences into words as a user-defined + * FlatMapFunction. The function takes a line (String) and splits it into + * multiple Word objects. + */ + public static final class Tokenizer implements FlatMapFunction<String, Word> { + + @Override + public void flatMap(String value, Collector<Word> out) { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Word(token, 1)); + } + } + } + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String textPath; + private static String outputPath; + + private static boolean parseParameters(String[] args) { + + if (args.length > 0) { + // parse input arguments + fileOutput = true; + if (args.length == 2) { + textPath = args[0]; + outputPath = args[1]; + } else { + System.err.println("Usage: WordCount <text path> <result path>"); + return false; + } + } else { + System.out.println("Executing WordCount example with built-in default data."); + System.out.println(" Provide parameters to read input data from a file."); + System.out.println(" Usage: WordCount <text path> <result path>"); + } + return true; + } + + private static DataSet<String> getTextDataSet(ExecutionEnvironment env) { + if (fileOutput) { + // read the text file from given input path + return env.readTextFile(textPath); + } else { + // get default test text data + return WordCountData.getDefaultTextLineDataSet(env); + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/util/WordCountData.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/util/WordCountData.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/util/WordCountData.java new file mode 100644 index 0000000..52efee7 --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/util/WordCountData.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.java.wordcount.util; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; + +/** + * Provides the default data sets used for the WordCount example program. + * The default data sets are used, if no parameters are given to the program. + * + */ +public class WordCountData { + + public static final String[] WORDS = new String[] { + "To be, or not to be,--that is the question:--", + "Whether 'tis nobler in the mind to suffer", + "The slings and arrows of outrageous fortune", + "Or to take arms against a sea of troubles,", + "And by opposing end them?--To die,--to sleep,--", + "No more; and by a sleep to say we end", + "The heartache, and the thousand natural shocks", + "That flesh is heir to,--'tis a consummation", + "Devoutly to be wish'd. To die,--to sleep;--", + "To sleep! perchance to dream:--ay, there's the rub;", + "For in that sleep of death what dreams may come,", + "When we have shuffled off this mortal coil,", + "Must give us pause: there's the respect", + "That makes calamity of so long life;", + "For who would bear the whips and scorns of time,", + "The oppressor's wrong, the proud man's contumely,", + "The pangs of despis'd love, the law's delay,", + "The insolence of office, and the spurns", + "That patient merit of the unworthy takes,", + "When he himself might his quietus make", + "With a bare bodkin? who would these fardels bear,", + "To grunt and sweat under a weary life,", + "But that the dread of something after death,--", + "The undiscover'd country, from whose bourn", + "No traveller returns,--puzzles the will,", + "And makes us rather bear those ills we have", + "Than fly to others that we know not of?", + "Thus conscience does make cowards of us all;", + "And thus the native hue of resolution", + "Is sicklied o'er with the pale cast of thought;", + "And enterprises of great pith and moment,", + "With this regard, their currents turn awry,", + "And lose the name of action.--Soft you now!", + "The fair Ophelia!--Nymph, in thy orisons", + "Be all my sins remember'd." + }; + + public static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env) { + return env.fromElements(WORDS); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/resources/log4j-test.properties b/flink-examples/flink-examples-batch/src/main/resources/log4j-test.properties new file mode 100644 index 0000000..65bd0b8 --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/resources/log4j-test.properties @@ -0,0 +1,23 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/resources/log4j.properties b/flink-examples/flink-examples-batch/src/main/resources/log4j.properties new file mode 100644 index 0000000..da32ea0 --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/resources/log4j.properties @@ -0,0 +1,23 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/resources/logback.xml ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/resources/logback.xml b/flink-examples/flink-examples-batch/src/main/resources/logback.xml new file mode 100644 index 0000000..95f2d04 --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/resources/logback.xml @@ -0,0 +1,29 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="INFO"> + <appender-ref ref="STDOUT"/> + </root> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala new file mode 100644 index 0000000..08a3e62 --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.scala.clustering + +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.examples.java.clustering.util.KMeansData + +import scala.collection.JavaConverters._ + +/** + * This example implements a basic K-Means clustering algorithm. + * + * K-Means is an iterative clustering algorithm and works as follows: + * K-Means is given a set of data points to be clustered and an initial set of ''K'' cluster + * centers. + * In each iteration, the algorithm computes the distance of each data point to each cluster center. + * Each point is assigned to the cluster center which is closest to it. + * Subsequently, each cluster center is moved to the center (''mean'') of all points that have + * been assigned to it. + * The moved cluster centers are fed into the next iteration. + * The algorithm terminates after a fixed number of iterations (as in this implementation) + * or if cluster centers do not (significantly) move in an iteration. + * This is the Wikipedia entry for the [[http://en.wikipedia + * .org/wiki/K-means_clustering K-Means Clustering algorithm]]. + * + * This implementation works on two-dimensional data points. + * It computes an assignment of data points to cluster centers, i.e., + * each data point is annotated with the id of the final cluster (center) it belongs to. + * + * Input files are plain text files and must be formatted as follows: + * + * - Data points are represented as two double values separated by a blank character. + * Data points are separated by newline characters. + * For example `"1.2 2.3\n5.3 7.2\n"` gives two data points (x=1.2, y=2.3) and (x=5.3, + * y=7.2). + * - Cluster centers are represented by an integer id and a point value. + * For example `"1 6.2 3.2\n2 2.9 5.7\n"` gives two centers (id=1, x=6.2, + * y=3.2) and (id=2, x=2.9, y=5.7). + * + * Usage: + * {{{ + * KMeans <points path> <centers path> <result path> <num iterations> + * }}} + * If no parameters are provided, the program is run with default data from + * [[org.apache.flink.examples.java.clustering.util.KMeansData]] + * and 10 iterations. + * + * This example shows how to use: + * + * - Bulk iterations + * - Broadcast variables in bulk iterations + * - Custom Java objects (PoJos) + */ +object KMeans { + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val env = ExecutionEnvironment.getExecutionEnvironment + + val points: DataSet[Point] = getPointDataSet(env) + val centroids: DataSet[Centroid] = getCentroidDataSet(env) + + val finalCentroids = centroids.iterate(numIterations) { currentCentroids => + val newCentroids = points + .map(new SelectNearestCenter).withBroadcastSet(currentCentroids, "centroids") + .map { x => (x._1, x._2, 1L) }.withForwardedFields("_1; _2") + .groupBy(0) + .reduce { (p1, p2) => (p1._1, p1._2.add(p2._2), p1._3 + p2._3) }.withForwardedFields("_1") + .map { x => new Centroid(x._1, x._2.div(x._3)) }.withForwardedFields("_1->id") + newCentroids + } + + val clusteredPoints: DataSet[(Int, Point)] = + points.map(new SelectNearestCenter).withBroadcastSet(finalCentroids, "centroids") + + if (fileOutput) { + clusteredPoints.writeAsCsv(outputPath, "\n", " ") + env.execute("Scala KMeans Example") + } + else { + clusteredPoints.print() + } + + } + + private def parseParameters(programArguments: Array[String]): Boolean = { + if (programArguments.length > 0) { + fileOutput = true + if (programArguments.length == 4) { + pointsPath = programArguments(0) + centersPath = programArguments(1) + outputPath = programArguments(2) + numIterations = Integer.parseInt(programArguments(3)) + + true + } + else { + System.err.println("Usage: KMeans <points path> <centers path> <result path> <num " + + "iterations>") + + false + } + } + else { + System.out.println("Executing K-Means example with default parameters and built-in default " + + "data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println(" We provide a data generator to create synthetic input files for this " + + "program.") + System.out.println(" Usage: KMeans <points path> <centers path> <result path> <num " + + "iterations>") + + true + } + } + + private def getPointDataSet(env: ExecutionEnvironment): DataSet[Point] = { + if (fileOutput) { + env.readCsvFile[(Double, Double)]( + pointsPath, + fieldDelimiter = " ", + includedFields = Array(0, 1)) + .map { x => new Point(x._1, x._2)} + } + else { + val points = KMeansData.POINTS map { + case Array(x, y) => new Point(x.asInstanceOf[Double], y.asInstanceOf[Double]) + } + env.fromCollection(points) + } + } + + private def getCentroidDataSet(env: ExecutionEnvironment): DataSet[Centroid] = { + if (fileOutput) { + env.readCsvFile[(Int, Double, Double)]( + centersPath, + fieldDelimiter = " ", + includedFields = Array(0, 1, 2)) + .map { x => new Centroid(x._1, x._2, x._3)} + } + else { + val centroids = KMeansData.CENTROIDS map { + case Array(id, x, y) => + new Centroid(id.asInstanceOf[Int], x.asInstanceOf[Double], y.asInstanceOf[Double]) + } + env.fromCollection(centroids) + } + } + + private var fileOutput: Boolean = false + private var pointsPath: String = null + private var centersPath: String = null + private var outputPath: String = null + private var numIterations: Int = 10 + + /** + * A simple two-dimensional point. + */ + class Point(var x: Double, var y: Double) extends Serializable { + def this() { + this(0, 0) + } + + def add(other: Point): Point = { + x += other.x + y += other.y + this + } + + def div(other: Long): Point = { + x /= other + y /= other + this + } + + def euclideanDistance(other: Point): Double = { + Math.sqrt((x - other.x) * (x - other.x) + (y - other.y) * (y - other.y)) + } + + def clear(): Unit = { + x = 0 + y = 0 + } + + override def toString: String = { + x + " " + y + } + } + + /** + * A simple two-dimensional centroid, basically a point with an ID. + */ + class Centroid(var id: Int, x: Double, y: Double) extends Point(x, y) { + def this() { + this(0, 0, 0) + } + + def this(id: Int, p: Point) { + this(id, p.x, p.y) + } + + override def toString: String = { + id + " " + super.toString + } + } + + /** Determines the closest cluster center for a data point. */ + @ForwardedFields(Array("*->_2")) + final class SelectNearestCenter extends RichMapFunction[Point, (Int, Point)] { + private var centroids: Traversable[Centroid] = null + + /** Reads the centroid values from a broadcast variable into a collection. */ + override def open(parameters: Configuration) { + centroids = getRuntimeContext.getBroadcastVariable[Centroid]("centroids").asScala + } + + def map(p: Point): (Int, Point) = { + var minDistance: Double = Double.MaxValue + var closestCentroidId: Int = -1 + for (centroid <- centroids) { + val distance = p.euclideanDistance(centroid) + if (distance < minDistance) { + minDistance = distance + closestCentroidId = centroid.id + } + } + (closestCentroidId, p) + } + + } +} + + http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala new file mode 100644 index 0000000..3df9791 --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.scala.graph + +import org.apache.flink.api.scala._ +import org.apache.flink.examples.java.graph.util.ConnectedComponentsData +import org.apache.flink.util.Collector + +/** + * An implementation of the connected components algorithm, using a delta iteration. + * + * Initially, the algorithm assigns each vertex an unique ID. In each step, a vertex picks the + * minimum of its own ID and its neighbors' IDs, as its new ID and tells its neighbors about its + * new ID. After the algorithm has completed, all vertices in the same component will have the same + * ID. + * + * A vertex whose component ID did not change needs not propagate its information in the next + * step. Because of that, the algorithm is easily expressible via a delta iteration. We here model + * the solution set as the vertices with their current component ids, and the workset as the changed + * vertices. Because we see all vertices initially as changed, the initial workset and the initial + * solution set are identical. Also, the delta to the solution set is consequently also the next + * workset. + * + * Input files are plain text files and must be formatted as follows: + * + * - Vertices represented as IDs and separated by new-line characters. For example, + * `"1\n2\n12\n42\n63"` gives five vertices (1), (2), (12), (42), and (63). + * - Edges are represented as pairs for vertex IDs which are separated by space characters. Edges + * are separated by new-line characters. For example `"1 2\n2 12\n1 12\n42 63"` + * gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63). + * + * Usage: + * {{{ + * ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations> + * }}} + * + * If no parameters are provided, the program is run with default data from + * [[org.apache.flink.examples.java.graph.util.ConnectedComponentsData]] and 10 iterations. + * + * + * This example shows how to use: + * + * - Delta Iterations + * - Generic-typed Functions + * + */ +object ConnectedComponents { + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + // set up execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + + // read vertex and edge data + // assign the initial components (equal to the vertex id) + val vertices = getVerticesDataSet(env).map { id => (id, id) }.withForwardedFields("*->_1;*->_2") + + // undirected edges by emitting for each input edge the input edges itself and an inverted + // version + val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) } + + // open a delta iteration + val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array("_1")) { + (s, ws) => + + // apply the step logic: join with the edges + val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) => + (edge._2, vertex._2) + }.withForwardedFieldsFirst("_2->_2").withForwardedFieldsSecond("_2->_1") + + // select the minimum neighbor + val minNeighbors = allNeighbors.groupBy(0).min(1) + + // update if the component of the candidate is smaller + val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) { + (newVertex, oldVertex, out: Collector[(Long, Long)]) => + if (newVertex._2 < oldVertex._2) out.collect(newVertex) + }.withForwardedFieldsFirst("*") + + // delta and new workset are identical + (updatedComponents, updatedComponents) + } + if (fileOutput) { + verticesWithComponents.writeAsCsv(outputPath, "\n", " ") + env.execute("Scala Connected Components Example") + } else { + verticesWithComponents.print() + } + + } + + private def parseParameters(args: Array[String]): Boolean = { + if (args.length > 0) { + fileOutput = true + if (args.length == 4) { + verticesPath = args(0) + edgesPath = args(1) + outputPath = args(2) + maxIterations = args(3).toInt + + true + } else { + System.err.println("Usage: ConnectedComponents <vertices path> <edges path> <result path>" + + " <max number of iterations>") + + false + } + } else { + System.out.println("Executing Connected Components example with built-in default data.") + System.out.println(" Provide parameters to read input data from a file.") + System.out.println(" Usage: ConnectedComponents <vertices path> <edges path> <result path>" + + " <max number of iterations>") + + true + } + } + + private def getVerticesDataSet(env: ExecutionEnvironment): DataSet[Long] = { + if (fileOutput) { + env.readCsvFile[Tuple1[Long]]( + verticesPath, + includedFields = Array(0)) + .map { x => x._1 } + } + else { + env.fromCollection(ConnectedComponentsData.VERTICES) + } + } + + private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[(Long, Long)] = { + if (fileOutput) { + env.readCsvFile[(Long, Long)]( + edgesPath, + fieldDelimiter = " ", + includedFields = Array(0, 1)) + .map { x => (x._1, x._2)} + } + else { + val edgeData = ConnectedComponentsData.EDGES map { + case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long]) + } + env.fromCollection(edgeData) + } + } + + private var fileOutput: Boolean = false + private var verticesPath: String = null + private var edgesPath: String = null + private var maxIterations: Int = 10 + private var outputPath: String = null +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala new file mode 100644 index 0000000..41fb307 --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.scala.graph + +import org.apache.flink.api.scala._ +import org.apache.flink.util.Collector + +object DeltaPageRank { + + private final val DAMPENING_FACTOR: Double = 0.85 + private final val NUM_VERTICES = 5 + private final val INITIAL_RANK = 1.0 / NUM_VERTICES + private final val RANDOM_JUMP = (1 - DAMPENING_FACTOR) / NUM_VERTICES + private final val THRESHOLD = 0.0001 / NUM_VERTICES + + type Page = (Long, Double) + type Adjacency = (Long, Array[Long]) + + def main(args: Array[String]) { + + val maxIterations = 100 + + val env = ExecutionEnvironment.getExecutionEnvironment + + val rawLines: DataSet[String] = env.fromElements( + "1 2 3 4", + "2 1", + "3 5", + "4 2 3", + "5 2 4") + val adjacency: DataSet[Adjacency] = rawLines + .map(str => { + val elements = str.split(' ') + val id = elements(0).toLong + val neighbors = elements.slice(1, elements.length).map(_.toLong) + (id, neighbors) + }) + + val initialRanks: DataSet[Page] = adjacency.flatMap { + (adj, out: Collector[Page]) => + { + val targets = adj._2 + val rankPerTarget = INITIAL_RANK * DAMPENING_FACTOR / targets.length + + // dampend fraction to targets + for (target <- targets) { + out.collect((target, rankPerTarget)) + } + + // random jump to self + out.collect((adj._1, RANDOM_JUMP)) + } + } + .groupBy(0).sum(1) + + val initialDeltas = initialRanks.map { (page) => (page._1, page._2 - INITIAL_RANK) } + .withForwardedFields("_1") + + val iteration = initialRanks.iterateDelta(initialDeltas, maxIterations, Array(0)) { + + (solutionSet, workset) => + { + val deltas = workset.join(adjacency).where(0).equalTo(0) { + (lastDeltas, adj, out: Collector[Page]) => + { + val targets = adj._2 + val deltaPerTarget = DAMPENING_FACTOR * lastDeltas._2 / targets.length + + for (target <- targets) { + out.collect((target, deltaPerTarget)) + } + } + } + .groupBy(0).sum(1) + .filter(x => Math.abs(x._2) > THRESHOLD) + + val rankUpdates = solutionSet.join(deltas).where(0).equalTo(0) { + (current, delta) => (current._1, current._2 + delta._2) + }.withForwardedFieldsFirst("_1") + + (rankUpdates, deltas) + } + } + + iteration.print() + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala new file mode 100644 index 0000000..170aa1d --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesBasic.scala @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.scala.graph + +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala._ +import scala.collection.JavaConverters._ +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.api.common.functions.GroupReduceFunction +import org.apache.flink.util.Collector +import org.apache.flink.examples.java.graph.util.EnumTrianglesData +import org.apache.flink.api.common.operators.Order + +import scala.collection.mutable + +/** + * Triangle enumeration is a pre-processing step to find closely connected parts in graphs. + * A triangle consists of three edges that connect three vertices with each other. + * + * The algorithm works as follows: + * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices + * that are connected by two edges. Finally, all triads are filtered for which no third edge exists + * that closes the triangle. + * + * Input files are plain text files and must be formatted as follows: + * + * - Edges are represented as pairs for vertex IDs which are separated by space + * characters. Edges are separated by new-line characters. + * For example `"1 2\n2 12\n1 12\n42 63"` gives four (undirected) edges (1)-(2), (2)-(12), + * (1)-(12), and (42)-(63) that include a triangle + * + * <pre> + * (1) + * / \ + * (2)-(12) + * </pre> + * + * Usage: + * {{{ + * EnumTriangleBasic <edge path> <result path> + * }}} + * <br> + * If no parameters are provided, the program is run with default data from + * [[org.apache.flink.examples.java.graph.util.EnumTrianglesData]] + * + * This example shows how to use: + * + * - Custom Java objects which extend Tuple + * - Group Sorting + * + */ +object EnumTrianglesBasic { + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + // set up execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + + // read input data + val edges = getEdgeDataSet(env) + + // project edges by vertex id + val edgesById = edges map(e => if (e.v1 < e.v2) e else Edge(e.v2, e.v1) ) + + val triangles = edgesById + // build triads + .groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new TriadBuilder()) + // filter triads + .join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => t } + .withForwardedFieldsFirst("*") + + // emit result + if (fileOutput) { + triangles.writeAsCsv(outputPath, "\n", ",") + // execute program + env.execute("TriangleEnumeration Example") + } else { + triangles.print() + } + + + } + + // ************************************************************************* + // USER DATA TYPES + // ************************************************************************* + + case class Edge(v1: Int, v2: Int) extends Serializable + case class Triad(v1: Int, v2: Int, v3: Int) extends Serializable + + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + /** + * Builds triads (triples of vertices) from pairs of edges that share a vertex. The first vertex + * of a triad is the shared vertex, the second and third vertex are ordered by vertexId. Assumes + * that input edges share the first vertex and are in ascending order of the second vertex. + */ + @ForwardedFields(Array("v1->v1")) + class TriadBuilder extends GroupReduceFunction[Edge, Triad] { + + val vertices = mutable.MutableList[Integer]() + + override def reduce(edges: java.lang.Iterable[Edge], out: Collector[Triad]) = { + + // clear vertex list + vertices.clear() + + // build and emit triads + for(e <- edges.asScala) { + + // combine vertex with all previously read vertices + for(v <- vertices) { + out.collect(Triad(e.v1, v, e.v2)) + } + vertices += e.v2 + } + } + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private def parseParameters(args: Array[String]): Boolean = { + if (args.length > 0) { + fileOutput = true + if (args.length == 2) { + edgePath = args(0) + outputPath = args(1) + + true + } else { + System.err.println("Usage: EnumTriangleBasic <edge path> <result path>") + + false + } + } else { + System.out.println("Executing Enum Triangles Basic example with built-in default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println(" Usage: EnumTriangleBasic <edge path> <result path>") + + true + } + } + + private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = { + if (fileOutput) { + env.readCsvFile[Edge](edgePath, fieldDelimiter = " ", includedFields = Array(0, 1)) + } else { + val edges = EnumTrianglesData.EDGES.map { + case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], v2.asInstanceOf[Int]) + } + env.fromCollection(edges) + } + } + + + private var fileOutput: Boolean = false + private var edgePath: String = null + private var outputPath: String = null + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala new file mode 100644 index 0000000..060a5f9 --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTrianglesOpt.scala @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.scala.graph + +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala._ +import scala.collection.JavaConverters._ +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.api.common.functions.GroupReduceFunction +import org.apache.flink.util.Collector +import org.apache.flink.examples.java.graph.util.EnumTrianglesData +import org.apache.flink.api.common.operators.Order + +import scala.collection.mutable + + +/** + * Triangle enumeration is a pre-processing step to find closely connected parts in graphs. + * A triangle consists of three edges that connect three vertices with each other. + * + * The basic algorithm works as follows: + * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices + * that are connected by two edges. Finally, all triads are filtered for which no third edge exists + * that closes the triangle. + * + * For a group of ''i'' edges that share a common vertex, the number of built triads is quadratic + * ''(n*(n-1))/2)''. Therefore, an optimization of the algorithm is to group edges on the vertex + * with the smaller output degree to reduce the number of triads. + * This implementation extends the basic algorithm by computing output degrees of edge vertices and + * grouping on edges on the vertex with the smaller degree. + * + * Input files are plain text files and must be formatted as follows: + * + * - Edges are represented as pairs for vertex IDs which are separated by space + * characters. Edges are separated by new-line characters. + * For example `"1 2\n2 12\n1 12\n42 63"` gives four (undirected) edges (1)-(2), (2)-(12), + * (1)-(12), and (42)-(63) that include a triangle + * + * <pre> + * (1) + * / \ + * (2)-(12) + * </pre> + * + * Usage: + * {{{ + * EnumTriangleOpt <edge path> <result path> + * }}} + * + * If no parameters are provided, the program is run with default data from + * [[org.apache.flink.examples.java.graph.util.EnumTrianglesData]]. + * + * This example shows how to use: + * + * - Custom Java objects which extend Tuple + * - Group Sorting + * + */ +object EnumTrianglesOpt { + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + // set up execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + + // read input data + val edges = getEdgeDataSet(env) + + val edgesWithDegrees = edges + // duplicate and switch edges + .flatMap(e => Seq(e, Edge(e.v2, e.v1))) + // add degree of first vertex + .groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new DegreeCounter()) + // join degrees of vertices + .groupBy("v1", "v2").reduce { + (e1, e2) => + if (e1.d2 == 0) { + new EdgeWithDegrees(e1.v1, e1.d1, e1.v2, e2.d2) + } else { + new EdgeWithDegrees(e1.v1, e2.d1, e1.v2, e1.d2) + } + }.withForwardedFields("v1;v2") + + // project edges by degrees, vertex with smaller degree comes first + val edgesByDegree = edgesWithDegrees + .map(e => if (e.d1 <= e.d2) Edge(e.v1, e.v2) else Edge(e.v2, e.v1)) + // project edges by Id, vertex with smaller Id comes first + val edgesById = edgesByDegree + .map(e => if (e.v1 < e.v2) e else Edge(e.v2, e.v1)) + + val triangles = edgesByDegree + // build triads + .groupBy("v1").sortGroup("v2", Order.ASCENDING).reduceGroup(new TriadBuilder()) + // filter triads + .join(edgesById).where("v2", "v3").equalTo("v1", "v2") { (t, _) => t} + .withForwardedFieldsFirst("*") + + // emit result + if (fileOutput) { + triangles.writeAsCsv(outputPath, "\n", ",") + // execute program + env.execute("TriangleEnumeration Example") + } else { + triangles.print() + } + + } + + // ************************************************************************* + // USER DATA TYPES + // ************************************************************************* + + case class Edge(v1: Int, v2: Int) extends Serializable + + case class Triad(v1: Int, v2: Int, v3: Int) extends Serializable + + case class EdgeWithDegrees(v1: Int, d1: Int, v2: Int, d2: Int) extends Serializable + + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + /** + * Counts the number of edges that share a common vertex. + * Emits one edge for each input edge with a degree annotation for the shared vertex. + * For each emitted edge, the first vertex is the vertex with the smaller id. + */ + class DegreeCounter extends GroupReduceFunction[Edge, EdgeWithDegrees] { + + val vertices = mutable.MutableList[Integer]() + var groupVertex = 0 + + override def reduce(edges: java.lang.Iterable[Edge], out: Collector[EdgeWithDegrees]) = { + + // empty vertex list + vertices.clear() + + // collect all vertices + for (e <- edges.asScala) { + groupVertex = e.v1 + if (!vertices.contains(e.v2) && e.v1 != e.v2) { + vertices += e.v2 + } + } + + // count vertices to obtain degree of groupVertex + val degree = vertices.length + + // create and emit edges with degrees + for (v <- vertices) { + if (v < groupVertex) { + out.collect(new EdgeWithDegrees(v, 0, groupVertex, degree)) + } else { + out.collect(new EdgeWithDegrees(groupVertex, degree, v, 0)) + } + } + } + } + + /** + * Builds triads (triples of vertices) from pairs of edges that share a vertex. + * The first vertex of a triad is the shared vertex, the second and third vertex are ordered by + * vertexId. + * Assumes that input edges share the first vertex and are in ascending order of the second + * vertex. + */ + @ForwardedFields(Array("v1")) + class TriadBuilder extends GroupReduceFunction[Edge, Triad] { + + val vertices = mutable.MutableList[Integer]() + + override def reduce(edges: java.lang.Iterable[Edge], out: Collector[Triad]) = { + + // clear vertex list + vertices.clear() + + // build and emit triads + for (e <- edges.asScala) { + // combine vertex with all previously read vertices + for (v <- vertices) { + out.collect(Triad(e.v1, v, e.v2)) + } + vertices += e.v2 + } + } + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private def parseParameters(args: Array[String]): Boolean = { + if (args.length > 0) { + fileOutput = true + if (args.length == 2) { + edgePath = args(0) + outputPath = args(1) + + true + } else { + System.err.println("Usage: EnumTriangleOpt <edge path> <result path>") + + false + } + } else { + System.out.println("Executing Enum Triangles Optimized example with built-in default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println(" Usage: EnumTriangleBasic <edge path> <result path>") + + true + } + } + + private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge] = { + if (fileOutput) { + env.readCsvFile[Edge]( + edgePath, + fieldDelimiter = " ", + includedFields = Array(0, 1)) + } else { + val edges = EnumTrianglesData.EDGES.map { + case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], v2.asInstanceOf[Int])} + env.fromCollection(edges) + } + } + + + private var fileOutput: Boolean = false + private var edgePath: String = null + private var outputPath: String = null + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala new file mode 100644 index 0000000..e1d4af6 --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.scala.graph + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.GroupReduceFunction +import org.apache.flink.api.scala._ +import org.apache.flink.examples.java.graph.util.PageRankData +import org.apache.flink.api.java.aggregation.Aggregations.SUM + +import org.apache.flink.util.Collector + +import scala.collection.JavaConverters._ + +/** + * A basic implementation of the Page Rank algorithm using a bulk iteration. + * + * This implementation requires a set of pages and a set of directed links as input and works as + * follows. + * + * In each iteration, the rank of every page is evenly distributed to all pages it points to. Each + * page collects the partial ranks of all pages that point to it, sums them up, and applies a + * dampening factor to the sum. The result is the new rank of the page. A new iteration is started + * with the new ranks of all pages. This implementation terminates after a fixed number of + * iterations. This is the Wikipedia entry for the + * [[http://en.wikipedia.org/wiki/Page_rank Page Rank algorithm]] + * + * Input files are plain text files and must be formatted as follows: + * + * - Pages represented as an (long) ID separated by new-line characters. + * For example `"1\n2\n12\n42\n63"` gives five pages with IDs 1, 2, 12, 42, and 63. + * - Links are represented as pairs of page IDs which are separated by space characters. Links + * are separated by new-line characters. + * For example `"1 2\n2 12\n1 12\n42 63"` gives four (directed) links (1)->(2), (2)->(12), + * (1)->(12), and (42)->(63). For this simple implementation it is required that each page has + * at least one incoming and one outgoing link (a page can point to itself). + * + * Usage: + * {{{ + * PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations> + * }}} + * + * If no parameters are provided, the program is run with default data from + * [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations. + * + * This example shows how to use: + * + * - Bulk Iterations + * - Default Join + * - Configure user-defined functions using constructor parameters. + * + */ +object PageRankBasic { + + private final val DAMPENING_FACTOR: Double = 0.85 + private final val EPSILON: Double = 0.0001 + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + // set up execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + + // read input data + val pages = getPagesDataSet(env) + val links = getLinksDataSet(env) + + // assign initial ranks to pages + val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages)).withForwardedFields("*->pageId") + + // build adjacency list from link input + val adjacencyLists = links + .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] { + override def reduce(values: Iterable[Link], out: Collector[AdjacencyList]): Unit = { + var outputId = -1L + val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId } + out.collect(new AdjacencyList(outputId, outputList.toArray)) + } + }) + + // start iteration + val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) { + currentRanks => + val newRanks = currentRanks + // distribute ranks to target pages + .join(adjacencyLists).where("pageId").equalTo("sourceId") { + (page, adjacent, out: Collector[Page]) => + val targets = adjacent.targetIds + val len = targets.length + adjacent.targetIds foreach { t => out.collect(Page(t, page.rank /len )) } + } + // collect ranks and sum them up + .groupBy("pageId").aggregate(SUM, "rank") + // apply dampening factor + .map { p => + Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages)) + }.withForwardedFields("pageId") + + // terminate if no rank update was significant + val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") { + (current, next, out: Collector[Int]) => + // check for significant update + if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1) + } + (newRanks, termination) + } + + val result = finalRanks + + // emit result + if (fileOutput) { + result.writeAsCsv(outputPath, "\n", " ") + // execute program + env.execute("Basic PageRank Example") + } else { + result.print() + } + } + + // ************************************************************************* + // USER TYPES + // ************************************************************************* + + case class Link(sourceId: Long, targetId: Long) + + case class Page(pageId: Long, rank: Double) + + case class AdjacencyList(sourceId: Long, targetIds: Array[Long]) + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private def parseParameters(args: Array[String]): Boolean = { + if (args.length > 0) { + fileOutput = true + if (args.length == 5) { + pagesInputPath = args(0) + linksInputPath = args(1) + outputPath = args(2) + numPages = args(3).toLong + maxIterations = args(4).toInt + + true + } else { + System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num " + + "pages> <num iterations>") + + false + } + } else { + System.out.println("Executing PageRank Basic example with default parameters and built-in " + + "default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println(" Usage: PageRankBasic <pages path> <links path> <output path> <num " + + "pages> <num iterations>") + + numPages = PageRankData.getNumberOfPages + + true + } + } + + private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = { + if (fileOutput) { + env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", lineDelimiter = "\n") + .map(x => x._1) + } else { + env.generateSequence(1, 15) + } + } + + private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = { + if (fileOutput) { + env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ", + includedFields = Array(0, 1)) + } else { + val edges = PageRankData.EDGES.map { case Array(v1, v2) => Link(v1.asInstanceOf[Long], + v2.asInstanceOf[Long])} + env.fromCollection(edges) + } + } + + private var fileOutput: Boolean = false + private var pagesInputPath: String = null + private var linksInputPath: String = null + private var outputPath: String = null + private var numPages: Long = 0 + private var maxIterations: Int = 10 + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala new file mode 100644 index 0000000..b7c0714 --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.scala.graph + +import org.apache.flink.api.scala._ +import org.apache.flink.examples.java.graph.util.ConnectedComponentsData +import org.apache.flink.util.Collector + +object TransitiveClosureNaive { + + def main (args: Array[String]): Unit = { + if (!parseParameters(args)) { + return + } + + val env = ExecutionEnvironment.getExecutionEnvironment + + val edges = getEdgesDataSet(env) + + val paths = edges.iterateWithTermination(maxIterations) { prevPaths: DataSet[(Long, Long)] => + + val nextPaths = prevPaths + .join(edges) + .where(1).equalTo(0) { + (left, right) => (left._1,right._2) + }.withForwardedFieldsFirst("_1").withForwardedFieldsSecond("_2") + .union(prevPaths) + .groupBy(0, 1) + .reduce((l, r) => l).withForwardedFields("_1; _2") + + val terminate = prevPaths + .coGroup(nextPaths) + .where(0).equalTo(0) { + (prev, next, out: Collector[(Long, Long)]) => { + val prevPaths = prev.toSet + for (n <- next) + if (!prevPaths.contains(n)) out.collect(n) + } + }.withForwardedFieldsSecond("*") + (nextPaths, terminate) + } + + if (fileOutput) { + paths.writeAsCsv(outputPath, "\n", " ") + env.execute("Scala Transitive Closure Example") + } else { + paths.print() + } + + + + } + + + private var fileOutput: Boolean = false + private var edgesPath: String = null + private var outputPath: String = null + private var maxIterations: Int = 10 + + private def parseParameters(programArguments: Array[String]): Boolean = { + if (programArguments.length > 0) { + fileOutput = true + if (programArguments.length == 3) { + edgesPath = programArguments(0) + outputPath = programArguments(1) + maxIterations = Integer.parseInt(programArguments(2)) + } + else { + System.err.println("Usage: TransitiveClosure <edges path> <result path> <max number of " + + "iterations>") + return false + } + } + else { + System.out.println("Executing TransitiveClosure example with default parameters and " + + "built-in default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println(" Usage: TransitiveClosure <edges path> <result path> <max number of " + + "iterations>") + } + true + } + + private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[(Long, Long)] = { + if (fileOutput) { + env.readCsvFile[(Long, Long)]( + edgesPath, + fieldDelimiter = " ", + includedFields = Array(0, 1)) + .map { x => (x._1, x._2)} + } + else { + val edgeData = ConnectedComponentsData.EDGES map { + case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long]) + } + env.fromCollection(edgeData) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala new file mode 100644 index 0000000..3453ee8 --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.scala.misc + +import org.apache.flink.api.scala._ + +object PiEstimation { + + def main(args: Array[String]) { + + val numSamples: Long = if (args.length > 0) args(0).toLong else 1000000 + + val env = ExecutionEnvironment.getExecutionEnvironment + + // count how many of the samples would randomly fall into + // the upper right quadrant of the unit circle + val count = + env.generateSequence(1, numSamples) + .map { sample => + val x = Math.random() + val y = Math.random() + if (x * x + y * y < 1) 1L else 0L + } + .reduce(_ + _) + + // ratio of samples in upper right quadrant vs total samples gives surface of upper + // right quadrant, times 4 gives surface of whole unit circle, i.e. PI + val pi = count + .map ( _ * 4.0 / numSamples) + + println("We estimate Pi to be:") + + pi.print() + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala new file mode 100644 index 0000000..2a7b786 --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/ml/LinearRegression.scala @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.scala.ml + +import org.apache.flink.api.common.functions._ +import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.examples.java.ml.util.LinearRegressionData + +import scala.collection.JavaConverters._ + +/** + * This example implements a basic Linear Regression to solve the y = theta0 + theta1*x problem + * using batch gradient descent algorithm. + * + * Linear Regression with BGD(batch gradient descent) algorithm is an iterative algorithm and + * works as follows: + * + * Giving a data set and target set, the BGD try to find out the best parameters for the data set + * to fit the target set. + * In each iteration, the algorithm computes the gradient of the cost function and use it to + * update all the parameters. + * The algorithm terminates after a fixed number of iterations (as in this implementation). + * With enough iteration, the algorithm can minimize the cost function and find the best parameters + * This is the Wikipedia entry for the + * [[http://en.wikipedia.org/wiki/Linear_regression Linear regression]] and + * [[http://en.wikipedia.org/wiki/Gradient_descent Gradient descent algorithm]]. + * + * This implementation works on one-dimensional data and finds the best two-dimensional theta to + * fit the target. + * + * Input files are plain text files and must be formatted as follows: + * + * - Data points are represented as two double values separated by a blank character. The first + * one represent the X(the training data) and the second represent the Y(target). Data points are + * separated by newline characters. + * For example `"-0.02 -0.04\n5.3 10.6\n"`gives two data points + * (x=-0.02, y=-0.04) and (x=5.3, y=10.6). + * + * This example shows how to use: + * + * - Bulk iterations + * - Broadcast variables in bulk iterations + */ +object LinearRegression { + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val env = ExecutionEnvironment.getExecutionEnvironment + val data = getDataSet(env) + val parameters = getParamsDataSet(env) + + val result = parameters.iterate(numIterations) { currentParameters => + val newParameters = data + .map(new SubUpdate).withBroadcastSet(currentParameters, "parameters") + .reduce { (p1, p2) => + val result = p1._1 + p2._1 + (result, p1._2 + p2._2) + } + .map { x => x._1.div(x._2) } + newParameters + } + + if (fileOutput) { + result.writeAsText(outputPath) + env.execute("Scala Linear Regression example") + } + else { + result.print() + } + } + + /** + * A simple data sample, x means the input, and y means the target. + */ + case class Data(var x: Double, var y: Double) + + /** + * A set of parameters -- theta0, theta1. + */ + case class Params(theta0: Double, theta1: Double) { + def div(a: Int): Params = { + Params(theta0 / a, theta1 / a) + } + + def + (other: Params) = { + Params(theta0 + other.theta0, theta1 + other.theta1) + } + } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + /** + * Compute a single BGD type update for every parameters. + */ + class SubUpdate extends RichMapFunction[Data, (Params, Int)] { + + private var parameter: Params = null + + /** Reads the parameters from a broadcast variable into a collection. */ + override def open(parameters: Configuration) { + val parameters = getRuntimeContext.getBroadcastVariable[Params]("parameters").asScala + parameter = parameters.head + } + + def map(in: Data): (Params, Int) = { + val theta0 = + parameter.theta0 - 0.01 * ((parameter.theta0 + (parameter.theta1 * in.x)) - in.y) + val theta1 = + parameter.theta1 - 0.01 * (((parameter.theta0 + (parameter.theta1 * in.x)) - in.y) * in.x) + (Params(theta0, theta1), 1) + } + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + private var fileOutput: Boolean = false + private var dataPath: String = null + private var outputPath: String = null + private var numIterations: Int = 10 + + private def parseParameters(programArguments: Array[String]): Boolean = { + if (programArguments.length > 0) { + fileOutput = true + if (programArguments.length == 3) { + dataPath = programArguments(0) + outputPath = programArguments(1) + numIterations = programArguments(2).toInt + + true + } + else { + System.err.println("Usage: LinearRegression <data path> <result path> <num iterations>") + + false + } + } + else { + System.out.println("Executing Linear Regression example with default parameters and " + + "built-in default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println(" We provide a data generator to create synthetic input files for this " + + "program.") + System.out.println(" Usage: LinearRegression <data path> <result path> <num iterations>") + + true + } + } + + private def getDataSet(env: ExecutionEnvironment): DataSet[Data] = { + if (fileOutput) { + env.readCsvFile[(Double, Double)]( + dataPath, + fieldDelimiter = " ", + includedFields = Array(0, 1)) + .map { t => new Data(t._1, t._2) } + } + else { + val data = LinearRegressionData.DATA map { + case Array(x, y) => Data(x.asInstanceOf[Double], y.asInstanceOf[Double]) + } + env.fromCollection(data) + } + } + + private def getParamsDataSet(env: ExecutionEnvironment): DataSet[Params] = { + val params = LinearRegressionData.PARAMS map { + case Array(x, y) => Params(x.asInstanceOf[Double], y.asInstanceOf[Double]) + } + env.fromCollection(params) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala new file mode 100644 index 0000000..9d4d2ee --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.scala.relational + +import org.apache.flink.api.scala._ + +import org.apache.flink.api.java.aggregation.Aggregations + +/** + * This program implements a modified version of the TPC-H query 10. + * + * The original query can be found at + * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf) + * (page 45). + * + * This program implements the following SQL equivalent: + * + * {{{ + * SELECT + * c_custkey, + * c_name, + * c_address, + * n_name, + * c_acctbal + * SUM(l_extendedprice * (1 - l_discount)) AS revenue, + * FROM + * customer, + * orders, + * lineitem, + * nation + * WHERE + * c_custkey = o_custkey + * AND l_orderkey = o_orderkey + * AND YEAR(o_orderdate) > '1990' + * AND l_returnflag = 'R' + * AND c_nationkey = n_nationkey + * GROUP BY + * c_custkey, + * c_name, + * c_acctbal, + * n_name, + * c_address + * }}} + * + * Compared to the original TPC-H query this version does not print + * c_phone and c_comment, only filters by years greater than 1990 instead of + * a period of 3 months, and does not sort the result by revenue.. + * + * Input files are plain text CSV files using the pipe character ('|') as field separator + * as generated by the TPC-H data generator which is available at + * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/). + * + * Usage: + * {{{ + *TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation path> <result path> + * }}} + * + * This example shows how to use: + * - tuple data types + * - build-in aggregation functions + * - join with size hints + * + */ +object TPCHQuery10 { + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + // get execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + + // get customer data set: (custkey, name, address, nationkey, acctbal) + val customers = getCustomerDataSet(env) + // get orders data set: (orderkey, custkey, orderdate) + val orders = getOrdersDataSet(env) + // get lineitem data set: (orderkey, extendedprice, discount, returnflag) + val lineitems = getLineitemDataSet(env) + // get nation data set: (nationkey, name) + val nations = getNationDataSet(env) + + // filter orders by years + val orders1990 = orders.filter( o => o._3.substring(0,4).toInt > 1990) + .map( o => (o._1, o._2)) + + // filter lineitems by return status + val lineitemsReturn = lineitems.filter( l => l._4.equals("R")) + .map( l => (l._1, l._2 * (1 - l._3)) ) + + // compute revenue by customer + val revenueByCustomer = orders1990.joinWithHuge(lineitemsReturn).where(0).equalTo(0) + .apply( (o,l) => (o._2, l._2) ) + .groupBy(0) + .aggregate(Aggregations.SUM, 1) + + // compute final result by joining customer and nation information with revenue + val result = customers.joinWithTiny(nations).where(3).equalTo(0) + .apply( (c, n) => (c._1, c._2, c._3, n._2, c._5) ) + .join(revenueByCustomer).where(0).equalTo(0) + .apply( (c, r) => (c._1, c._2, c._3, c._4, c._5, r._2) ) + // emit result + result.writeAsCsv(outputPath, "\n", "|") + + // execute program + env.execute("Scala TPCH Query 10 Example") + } + + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private var customerPath: String = null + private var ordersPath: String = null + private var lineitemPath: String = null + private var nationPath: String = null + private var outputPath: String = null + + private def parseParameters(args: Array[String]): Boolean = { + if (args.length == 5) { + customerPath = args(0) + ordersPath = args(1) + lineitemPath = args(2) + nationPath = args(3) + outputPath = args(4) + true + } else { + System.err.println("This program expects data from the TPC-H benchmark as input data.\n" + + " Due to legal restrictions, we can not ship generated data.\n" + + " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + + " Usage: TPCHQuery10 <customer-csv path> <orders-csv path> " + + "<lineitem-csv path> <nation-csv path> <result path>") + false + } + } + + private def getCustomerDataSet(env: ExecutionEnvironment): + DataSet[Tuple5[Int, String, String, Int, Double]] = { + env.readCsvFile[Tuple5[Int, String, String, Int, Double]]( + customerPath, + fieldDelimiter = "|", + includedFields = Array(0,1,2,3,5) ) + } + + private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Tuple3[Int, Int, String]] = { + env.readCsvFile[Tuple3[Int, Int, String]]( + ordersPath, + fieldDelimiter = "|", + includedFields = Array(0, 1, 4) ) + } + + private def getLineitemDataSet(env: ExecutionEnvironment): + DataSet[Tuple4[Int, Double, Double, String]] = { + env.readCsvFile[Tuple4[Int, Double, Double, String]]( + lineitemPath, + fieldDelimiter = "|", + includedFields = Array(0, 5, 6, 8) ) + } + + private def getNationDataSet(env: ExecutionEnvironment): DataSet[Tuple2[Int, String]] = { + env.readCsvFile[Tuple2[Int, String]]( + nationPath, + fieldDelimiter = "|", + includedFields = Array(0, 1) ) + } + +}