[FLINK-3195] [examples] Consolidate batch examples into one project, unify batch and streaming examples under on parent project
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0e1d635 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0e1d635 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0e1d635 Branch: refs/heads/master Commit: d0e1d635d9e6a4ef157275099e2e787b3b18c0ed Parents: 62938c1 Author: Stephan Ewen <se...@apache.org> Authored: Wed Dec 23 20:17:56 2015 +0100 Committer: Robert Metzger <rmetz...@apache.org> Committed: Thu Jan 14 11:18:58 2016 +0100 ---------------------------------------------------------------------- docs/apis/cli.md | 18 +- docs/apis/examples.md | 4 +- docs/setup/gce_setup.md | 2 +- docs/setup/yarn_setup.md | 6 +- flink-contrib/flink-storm-examples/pom.xml | 6 +- flink-dist/pom.xml | 8 +- flink-dist/src/main/assemblies/bin.xml | 32 +- flink-examples/flink-examples-batch/pom.xml | 400 +++++++++++++ .../flink/examples/java/clustering/KMeans.java | 343 ++++++++++++ .../java/clustering/util/KMeansData.java | 104 ++++ .../clustering/util/KMeansDataGenerator.java | 189 +++++++ .../flink/examples/java/distcp/DistCp.java | 182 ++++++ .../examples/java/distcp/FileCopyTask.java | 59 ++ .../java/distcp/FileCopyTaskInputFormat.java | 116 ++++ .../java/distcp/FileCopyTaskInputSplit.java | 46 ++ .../java/graph/ConnectedComponents.java | 243 ++++++++ .../examples/java/graph/EnumTrianglesBasic.java | 231 ++++++++ .../examples/java/graph/EnumTrianglesOpt.java | 356 ++++++++++++ .../flink/examples/java/graph/PageRank.java | 286 ++++++++++ .../java/graph/TransitiveClosureNaive.java | 154 +++++ .../graph/util/ConnectedComponentsData.java | 72 +++ .../java/graph/util/EnumTrianglesData.java | 58 ++ .../java/graph/util/EnumTrianglesDataTypes.java | 116 ++++ .../examples/java/graph/util/PageRankData.java | 86 +++ .../java/misc/CollectionExecutionExample.java | 96 ++++ .../flink/examples/java/misc/PiEstimation.java | 99 ++++ .../examples/java/ml/LinearRegression.java | 316 +++++++++++ .../java/ml/util/LinearRegressionData.java | 71 +++ .../ml/util/LinearRegressionDataGenerator.java | 112 ++++ .../relational/EmptyFieldsCountAccumulator.java | 255 +++++++++ .../examples/java/relational/TPCHQuery10.java | 234 ++++++++ .../examples/java/relational/TPCHQuery3.java | 271 +++++++++ .../java/relational/WebLogAnalysis.java | 325 +++++++++++ .../java/relational/util/WebLogData.java | 427 ++++++++++++++ .../relational/util/WebLogDataGenerator.java | 210 +++++++ .../examples/java/wordcount/WordCount.java | 148 +++++ .../examples/java/wordcount/WordCountPojo.java | 173 ++++++ .../java/wordcount/util/WordCountData.java | 72 +++ .../src/main/resources/log4j-test.properties | 23 + .../src/main/resources/log4j.properties | 23 + .../src/main/resources/logback.xml | 29 + .../examples/scala/clustering/KMeans.scala | 255 +++++++++ .../scala/graph/ConnectedComponents.scala | 168 ++++++ .../examples/scala/graph/DeltaPageRank.scala | 104 ++++ .../scala/graph/EnumTrianglesBasic.scala | 185 ++++++ .../examples/scala/graph/EnumTrianglesOpt.scala | 253 +++++++++ .../examples/scala/graph/PageRankBasic.scala | 210 +++++++ .../scala/graph/TransitiveClosureNaive.scala | 116 ++++ .../examples/scala/misc/PiEstimation.scala | 52 ++ .../examples/scala/ml/LinearRegression.scala | 195 +++++++ .../examples/scala/relational/TPCHQuery10.scala | 184 ++++++ .../examples/scala/relational/TPCHQuery3.scala | 172 ++++++ .../scala/relational/WebLogAnalysis.scala | 211 +++++++ .../examples/scala/wordcount/WordCount.scala | 101 ++++ flink-examples/flink-examples-streaming/pom.xml | 559 +++++++++++++++++++ .../examples/iteration/IterateExample.java | 246 ++++++++ .../iteration/util/IterateExampleData.java | 32 ++ .../streaming/examples/join/WindowJoin.java | 296 ++++++++++ .../examples/join/util/WindowJoinData.java | 61 ++ .../ml/IncrementalLearningSkeleton.java | 254 +++++++++ .../util/IncrementalLearningSkeletonData.java | 32 ++ .../socket/SocketTextStreamWordCount.java | 105 ++++ .../examples/twitter/TwitterStream.java | 164 ++++++ .../twitter/util/TwitterStreamData.java | 32 ++ .../GroupedProcessingTimeWindowExample.java | 127 +++++ .../examples/windowing/SessionWindowing.java | 167 ++++++ .../examples/windowing/TopSpeedWindowing.java | 210 +++++++ .../examples/windowing/WindowWordCount.java | 132 +++++ .../windowing/util/SessionWindowingData.java | 27 + .../util/TopSpeedWindowingExampleData.java | 276 +++++++++ .../examples/wordcount/PojoExample.java | 186 ++++++ .../streaming/examples/wordcount/WordCount.java | 148 +++++ .../scala/examples/join/WindowJoin.scala | 156 ++++++ .../socket/SocketTextStreamWordCount.scala | 93 +++ .../examples/windowing/TopSpeedWindowing.scala | 150 +++++ .../iteration/IterateExampleITCase.java | 45 ++ .../join/WindowJoinITCase.java | 50 ++ .../ml/IncrementalLearningSkeletonITCase.java | 42 ++ .../socket/SocketTextStreamWordCountITCase.java | 30 + .../twitter/TwitterStreamITCase.java | 42 ++ .../windowing/SessionWindowingITCase.java | 42 ++ .../TopSpeedWindowingExampleITCase.java | 45 ++ .../windowing/WindowWordCountITCase.java | 50 ++ .../wordcount/PojoExampleITCase.java | 45 ++ .../wordcount/WordCountITCase.java | 45 ++ .../join/WindowJoinITCase.java | 50 ++ .../socket/SocketTextStreamWordCountITCase.java | 30 + .../TopSpeedWindowingExampleITCase.java | 45 ++ flink-examples/flink-java-examples/pom.xml | 330 ----------- .../flink/examples/java/clustering/KMeans.java | 344 ------------ .../java/clustering/util/KMeansData.java | 105 ---- .../clustering/util/KMeansDataGenerator.java | 189 ------- .../flink/examples/java/distcp/DistCp.java | 182 ------ .../examples/java/distcp/FileCopyTask.java | 59 -- .../java/distcp/FileCopyTaskInputFormat.java | 116 ---- .../java/distcp/FileCopyTaskInputSplit.java | 46 -- .../java/graph/ConnectedComponents.java | 243 -------- .../examples/java/graph/EnumTrianglesBasic.java | 231 -------- .../examples/java/graph/EnumTrianglesOpt.java | 358 ------------ .../examples/java/graph/PageRankBasic.java | 288 ---------- .../java/graph/TransitiveClosureNaive.java | 155 ----- .../graph/util/ConnectedComponentsData.java | 73 --- .../java/graph/util/EnumTrianglesData.java | 59 -- .../java/graph/util/EnumTrianglesDataTypes.java | 117 ---- .../examples/java/graph/util/PageRankData.java | 87 --- .../java/misc/CollectionExecutionExample.java | 96 ---- .../flink/examples/java/misc/PiEstimation.java | 99 ---- .../examples/java/ml/LinearRegression.java | 317 ----------- .../java/ml/util/LinearRegressionData.java | 72 --- .../ml/util/LinearRegressionDataGenerator.java | 113 ---- .../relational/EmptyFieldsCountAccumulator.java | 254 --------- .../examples/java/relational/TPCHQuery10.java | 234 -------- .../examples/java/relational/TPCHQuery3.java | 272 --------- .../java/relational/WebLogAnalysis.java | 327 ----------- .../java/relational/util/WebLogData.java | 428 -------------- .../relational/util/WebLogDataGenerator.java | 211 ------- .../examples/java/wordcount/PojoExample.java | 171 ------ .../examples/java/wordcount/WordCount.java | 148 ----- .../examples/java/wordcount/WordCountMeta.java | 54 -- .../java/wordcount/util/WordCountData.java | 72 --- .../src/main/resources/log4j-test.properties | 23 - .../src/main/resources/log4j.properties | 23 - .../src/main/resources/logback.xml | 29 - flink-examples/flink-scala-examples/pom.xml | 487 ---------------- .../org/apache/flink/examples/scala/Dummy.java | 27 - .../examples/scala/clustering/KMeans.scala | 255 --------- .../scala/graph/ConnectedComponents.scala | 167 ------ .../examples/scala/graph/DeltaPageRank.scala | 104 ---- .../scala/graph/EnumTrianglesBasic.scala | 185 ------ .../examples/scala/graph/EnumTrianglesOpt.scala | 253 --------- .../examples/scala/graph/PageRankBasic.scala | 210 ------- .../scala/graph/TransitiveClosureNaive.scala | 115 ---- .../examples/scala/misc/PiEstimation.scala | 52 -- .../examples/scala/ml/LinearRegression.scala | 195 ------- .../examples/scala/relational/TPCHQuery10.scala | 184 ------ .../examples/scala/relational/TPCHQuery3.scala | 172 ------ .../scala/relational/WebLogAnalysis.scala | 210 ------- .../examples/scala/wordcount/WordCount.scala | 100 ---- flink-examples/pom.xml | 4 +- flink-staging/flink-fs-tests/pom.xml | 2 +- flink-staging/flink-table/pom.xml | 2 +- flink-staging/flink-tez/pom.xml | 2 +- flink-streaming-examples/pom.xml | 535 ------------------ .../examples/iteration/IterateExample.java | 246 -------- .../iteration/util/IterateExampleData.java | 32 -- .../streaming/examples/join/WindowJoin.java | 296 ---------- .../examples/join/util/WindowJoinData.java | 61 -- .../ml/IncrementalLearningSkeleton.java | 254 --------- .../util/IncrementalLearningSkeletonData.java | 32 -- .../socket/SocketTextStreamWordCount.java | 105 ---- .../examples/twitter/TwitterStream.java | 164 ------ .../twitter/util/TwitterStreamData.java | 32 -- .../GroupedProcessingTimeWindowExample.java | 127 ----- .../examples/windowing/SessionWindowing.java | 167 ------ .../examples/windowing/TopSpeedWindowing.java | 210 ------- .../examples/windowing/WindowWordCount.java | 132 ----- .../windowing/util/SessionWindowingData.java | 27 - .../util/TopSpeedWindowingExampleData.java | 276 --------- .../examples/wordcount/PojoExample.java | 186 ------ .../streaming/examples/wordcount/WordCount.java | 148 ----- .../scala/examples/join/WindowJoin.scala | 156 ------ .../socket/SocketTextStreamWordCount.scala | 93 --- .../examples/windowing/TopSpeedWindowing.scala | 150 ----- .../iteration/IterateExampleITCase.java | 45 -- .../join/WindowJoinITCase.java | 50 -- .../ml/IncrementalLearningSkeletonITCase.java | 42 -- .../socket/SocketTextStreamWordCountITCase.java | 30 - .../twitter/TwitterStreamITCase.java | 42 -- .../windowing/SessionWindowingITCase.java | 42 -- .../TopSpeedWindowingExampleITCase.java | 45 -- .../windowing/WindowWordCountITCase.java | 50 -- .../wordcount/PojoExampleITCase.java | 45 -- .../wordcount/WordCountITCase.java | 45 -- .../join/WindowJoinITCase.java | 50 -- .../socket/SocketTextStreamWordCountITCase.java | 30 - .../TopSpeedWindowingExampleITCase.java | 45 -- flink-tests/pom.xml | 12 +- .../exampleJavaPrograms/PageRankITCase.java | 8 +- .../iterations/PageRankCompilerTest.java | 10 +- .../jsonplan/DumpCompiledPlanTest.java | 4 +- .../optimizer/jsonplan/PreviewPlanDumpTest.java | 4 +- .../flink/yarn/YARNSessionFIFOITCase.java | 13 +- pom.xml | 1 - 183 files changed, 12215 insertions(+), 12699 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/docs/apis/cli.md ---------------------------------------------------------------------- diff --git a/docs/apis/cli.md b/docs/apis/cli.md index 78ea4b6..1150123 100644 --- a/docs/apis/cli.md +++ b/docs/apis/cli.md @@ -46,47 +46,47 @@ The command line can be used to - Run example program with no arguments. - ./bin/flink run ./examples/WordCount.jar + ./bin/flink run ./examples/batch/WordCount.jar - Run example program with arguments for input and result files - ./bin/flink run ./examples/WordCount.jar \ + ./bin/flink run ./examples/batch/WordCount.jar \ file:///home/user/hamlet.txt file:///home/user/wordcount_out - Run example program with parallelism 16 and arguments for input and result files - ./bin/flink run -p 16 ./examples/WordCount.jar \ + ./bin/flink run -p 16 ./examples/batch/WordCount.jar \ file:///home/user/hamlet.txt file:///home/user/wordcount_out - Run example program with flink log output disabled - ./bin/flink run -q ./examples/WordCount.jar + ./bin/flink run -q ./examples/batch/WordCount.jar - Run example program in detached mode - ./bin/flink run -d ./examples/WordCount.jar + ./bin/flink run -d ./examples/batch/WordCount.jar - Run example program on a specific JobManager: ./bin/flink run -m myJMHost:6123 \ - ./examples/WordCount.jar \ + ./examples/batch/WordCount.jar \ file:///home/user/hamlet.txt file:///home/user/wordcount_out - Run example program with a specific class as an entry point: ./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount \ - ./examples/WordCount.jar \ + ./examples/batch/WordCount.jar \ file:///home/user/hamlet.txt file:///home/user/wordcount_out - Run example program using a [per-job YARN cluster]({{site.baseurl}}/setup/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn) with 2 TaskManagers: ./bin/flink run -m yarn-cluster -yn 2 \ - ./examples/WordCount.jar \ + ./examples/batch/WordCount.jar \ hdfs:///user/hamlet.txt hdfs:///user/wordcount_out - Display the optimized execution plan for the WordCount example program as JSON: - ./bin/flink info ./examples/WordCount.jar \ + ./bin/flink info ./examples/batch/WordCount.jar \ file:///home/user/hamlet.txt file:///home/user/wordcount_out - List scheduled and running jobs (including their JobIDs): http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/docs/apis/examples.md ---------------------------------------------------------------------- diff --git a/docs/apis/examples.md b/docs/apis/examples.md index d22b436..a11ed9c 100644 --- a/docs/apis/examples.md +++ b/docs/apis/examples.md @@ -42,7 +42,7 @@ Each binary release of Flink contains an `examples` directory with jar files for To run the WordCount example, issue the following command: ~~~bash -./bin/flink run ./examples/WordCount.jar +./bin/flink run ./examples/batch/WordCount.jar ~~~ The other examples can be started in a similar way. @@ -50,7 +50,7 @@ The other examples can be started in a similar way. Note that many examples run without passing any arguments for them, by using build-in data. To run WordCount with real data, you have to pass the path to the data: ~~~bash -./bin/flink run ./examples/WordCount.jar /path/to/some/text/data /path/to/result +./bin/flink run ./examples/batch/WordCount.jar /path/to/some/text/data /path/to/result ~~~ Note that non-local file systems require a schema prefix, such as `hdfs://`. http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/docs/setup/gce_setup.md ---------------------------------------------------------------------- diff --git a/docs/setup/gce_setup.md b/docs/setup/gce_setup.md index f6499dc..4f3996a 100644 --- a/docs/setup/gce_setup.md +++ b/docs/setup/gce_setup.md @@ -95,7 +95,7 @@ To bring up the Flink cluster on Google Compute Engine, execute: ./bdutil shell cd /home/hadoop/flink-install/bin - ./flink run ../examples/WordCount.jar gs://dataflow-samples/shakespeare/othello.txt gs://<bucket_name>/output + ./flink run ../examples/batch/WordCount.jar gs://dataflow-samples/shakespeare/othello.txt gs://<bucket_name>/output ## Shut down your cluster http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/docs/setup/yarn_setup.md ---------------------------------------------------------------------- diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md index a7309e4..7a00af5 100644 --- a/docs/setup/yarn_setup.md +++ b/docs/setup/yarn_setup.md @@ -48,7 +48,7 @@ Once the session has been started, you can submit jobs to the cluster using the curl -O <flink_hadoop2_download_url> tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz cd flink-{{ site.version }}/ -./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/WordCount.jar +./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/batch/WordCount.jar ~~~ ## Apache Flink on Hadoop YARN using a YARN Session @@ -179,7 +179,7 @@ Use the *run* action to submit a job to YARN. The client is able to determine th ~~~bash wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txt hadoop fs -copyFromLocal LICENSE-2.0.txt hdfs:/// ... -./bin/flink run ./examples/WordCount.jar \ +./bin/flink run ./examples/batch/WordCount.jar \ hdfs:///..../LICENSE-2.0.txt hdfs:///.../wordcount-result.txt ~~~ @@ -205,7 +205,7 @@ Please note that the client then expects the `-yn` value to be set (number of Ta ***Example:*** ~~~bash -./bin/flink run -m yarn-cluster -yn 2 ./examples/WordCount.jar +./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar ~~~ The command line options of the YARN session are also available with the `./bin/flink` tool. http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-contrib/flink-storm-examples/pom.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/pom.xml b/flink-contrib/flink-storm-examples/pom.xml index 6f3a050..7e093ff 100644 --- a/flink-contrib/flink-storm-examples/pom.xml +++ b/flink-contrib/flink-storm-examples/pom.xml @@ -43,7 +43,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-java-examples</artifactId> + <artifactId>flink-examples-batch</artifactId> <version>${project.version}</version> </dependency> @@ -73,7 +73,7 @@ under the License. <build> <plugins> - <!-- get default data from flink-java-examples package --> + <!-- get default data from flink-example-batch package --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> @@ -89,7 +89,7 @@ under the License. <artifactItems> <artifactItem> <groupId>org.apache.flink</groupId> - <artifactId>flink-java-examples</artifactId> + <artifactId>flink-examples-batch</artifactId> <version>${project.version}</version> <type>jar</type> <overWrite>false</overWrite> http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-dist/pom.xml ---------------------------------------------------------------------- diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index e6b2fe0..543652f 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -85,13 +85,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-java-examples</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-scala-examples</artifactId> + <artifactId>flink-examples-batch</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-dist/src/main/assemblies/bin.xml ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml index 602af68..b067280 100644 --- a/flink-dist/src/main/assemblies/bin.xml +++ b/flink-dist/src/main/assemblies/bin.xml @@ -103,7 +103,7 @@ under the License. <!-- copy *.txt files --> <fileSet> <directory>src/main/flink-bin/</directory> - <outputDirectory></outputDirectory> + <outputDirectory/> <fileMode>0644</fileMode> <includes> <include>*.txt</include> @@ -113,7 +113,7 @@ under the License. <!-- copy LICENSE/NOTICE files --> <fileSet> <directory>../</directory> - <outputDirectory></outputDirectory> + <outputDirectory/> <fileMode>0644</fileMode> <includes> <include>LICENSE*</include> @@ -150,21 +150,31 @@ under the License. </excludes> </fileSet> - <!-- copy jar files of java examples --> + <!-- copy jar files of the batch examples --> <fileSet> - <directory>../flink-examples/flink-java-examples/target</directory> - <outputDirectory>examples</outputDirectory> + <directory>../flink-examples/flink-examples-batch/target</directory> + <outputDirectory>examples/batch</outputDirectory> <fileMode>0644</fileMode> <includes> <include>*.jar</include> </includes> <excludes> - <exclude>flink-java-examples*-${project.version}.jar</exclude> - <exclude>original-flink-java-examples*-${project.version}.jar</exclude> - <exclude>flink-java-examples*-${project.version}-sources.jar</exclude> - <exclude>flink-java-examples*-${project.version}-tests.jar</exclude> - <exclude>flink-java-examples*-${project.version}-javadoc.jar</exclude> - <exclude>flink-java-examples*-${project.version}-*.jar</exclude> + <exclude>flink-examples-batch*.jar</exclude> + <exclude>original-flink-examples-batch*.jar</exclude> + </excludes> + </fileSet> + + <!-- copy jar files of the streaming examples --> + <fileSet> + <directory>../flink-examples/flink-examples-streaming/target</directory> + <outputDirectory>examples/streaming</outputDirectory> + <fileMode>0644</fileMode> + <includes> + <include>*.jar</include> + </includes> + <excludes> + <exclude>flink-examples-streaming*.jar</exclude> + <exclude>original-flink-examples-streaming*.jar</exclude> </excludes> </fileSet> http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/pom.xml ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/pom.xml b/flink-examples/flink-examples-batch/pom.xml new file mode 100644 index 0000000..a989ef5 --- /dev/null +++ b/flink-examples/flink-examples-batch/pom.xml @@ -0,0 +1,400 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-examples</artifactId> + <version>1.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-examples-batch</artifactId> + <name>flink-examples-batch</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + + <build> + <plugins> + + <!-- Scala Compiler --> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.1.4</version> + <executions> + <!-- Run scala compiler in the process-resources phase, so that dependencies on + scala classes can be resolved later in the (Java) compile phase --> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + <configuration> + <jvmArgs> + <jvmArg>-Xms128m</jvmArg> + <jvmArg>-Xmx512m</jvmArg> + </jvmArgs> + </configuration> + </plugin> + + <!-- Eclipse Integration --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <version>2.8</version> + <configuration> + <downloadSources>true</downloadSources> + <projectnatures> + <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> + <projectnature>org.eclipse.jdt.core.javanature</projectnature> + </projectnatures> + <buildcommands> + <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> + </buildcommands> + <classpathContainers> + <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> + <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> + </classpathContainers> + <excludes> + <exclude>org.scala-lang:scala-library</exclude> + <exclude>org.scala-lang:scala-compiler</exclude> + </excludes> + <sourceIncludes> + <sourceInclude>**/*.scala</sourceInclude> + <sourceInclude>**/*.java</sourceInclude> + </sourceIncludes> + </configuration> + </plugin> + + <!-- Adding scala source directories to build path --> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.7</version> + <executions> + <!-- Add src/main/scala to eclipse build path --> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + </sources> + </configuration> + </execution> + <!-- Add src/test/scala to eclipse build path --> + <execution> + <id>add-test-source</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>src/test/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + + <!-- Scala Code Style --> + <plugin> + <groupId>org.scalastyle</groupId> + <artifactId>scalastyle-maven-plugin</artifactId> + <version>0.5.0</version> + <executions> + <execution> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + <configuration> + <verbose>false</verbose> + <failOnViolation>true</failOnViolation> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + <failOnWarning>false</failOnWarning> + <sourceDirectory>${basedir}/src/main/scala</sourceDirectory> + <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory> + <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation> + <outputFile>${project.basedir}/scalastyle-output.xml</outputFile> + <outputEncoding>UTF-8</outputEncoding> + </configuration> + </plugin> + + <!-- create the exampe JAR files --> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + + <!-- KMeans --> + <execution> + <id>KMeans</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + + <configuration> + <classifier>KMeans</classifier> + + <archive> + <manifestEntries> + <program-class>org.apache.flink.examples.java.clustering.KMeans</program-class> + </manifestEntries> + </archive> + + <includes> + <include>**/java/clustering/KMeans.class</include> + <include>**/java/clustering/KMeans$*.class</include> + <include>**/java/clustering/util/KMeansDataGenerator.class</include> + <include>**/java/clustering/util/KMeansData.class</include> + </includes> + </configuration> + </execution> + + <!-- Transitive Closure --> + <execution> + <id>TransitiveClosure</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <classifier>TransitiveClosure</classifier> + + <archive> + <manifestEntries> + <program-class>org.apache.flink.examples.java.graph.TransitiveClosureNaive</program-class> + </manifestEntries> + </archive> + + <includes> + <include>**/java/graph/TransitiveClosureNaive.class</include> + <include>**/java/graph/TransitiveClosureNaive$*.class</include> + <include>**/java/graph/util/ConnectedComponentsData.class</include> + </includes> + </configuration> + </execution> + + <!-- Connected Components --> + <execution> + <id>ConnectedComponents</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <classifier>ConnectedComponents</classifier> + + <archive> + <manifestEntries> + <program-class>org.apache.flink.examples.java.graph.ConnectedComponents</program-class> + </manifestEntries> + </archive> + + <includes> + <include>**/java/graph/ConnectedComponents.class</include> + <include>**/java/graph/ConnectedComponents$*.class</include> + <include>**/java/graph/util/ConnectedComponentsData.class</include> + </includes> + </configuration> + </execution> + + <!-- EnumTriangles Basic --> + <execution> + <id>EnumerateGraphTriangles</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <classifier>EnumerateGraphTriangles</classifier> + + <archive> + <manifestEntries> + <program-class>org.apache.flink.examples.java.graph.EnumTrianglesBasic</program-class> + </manifestEntries> + </archive> + + <includes> + <include>**/java/graph/EnumTrianglesBasic.class</include> + <include>**/java/graph/EnumTrianglesBasic$*.class</include> + <include>**/java/graph/util/EnumTrianglesDataTypes.class</include> + <include>**/java/graph/util/EnumTrianglesDataTypes$*.class</include> + <include>**/java/graph/util/EnumTrianglesData.class</include> + </includes> + </configuration> + </execution> + + <!-- PageRank --> + <execution> + <id>PageRank</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <classifier>PageRank</classifier> + + <archive> + <manifestEntries> + <program-class>org.apache.flink.examples.java.graph.PageRank</program-class> + </manifestEntries> + </archive> + + <includes> + <include>**/java/graph/PageRank.class</include> + <include>**/java/graph/PageRank$*.class</include> + <include>**/java/graph/util/PageRankData.class</include> + </includes> + </configuration> + </execution> + + <!-- WebLogAnalysis --> + <execution> + <id>WebLogAnalysis</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <classifier>WebLogAnalysis</classifier> + + <archive> + <manifestEntries> + <program-class>org.apache.flink.examples.java.relational.WebLogAnalysis</program-class> + </manifestEntries> + </archive> + + <includes> + <include>**/java/relational/WebLogAnalysis.class</include> + <include>**/java/relational/WebLogAnalysis$*.class</include> + <include>**/java/relational/util/WebLogData.class</include> + <include>**/java/relational/util/WebLogDataGenerator.class</include> + </includes> + </configuration> + </execution> + + <!-- WordCount --> + <execution> + <id>WordCount</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <classifier>WordCount</classifier> + + <archive> + <manifestEntries> + <program-class>org.apache.flink.examples.java.wordcount.WordCount</program-class> + </manifestEntries> + </archive> + + <includes> + <include>**/java/wordcount/WordCount.class</include> + <include>**/java/wordcount/WordCount$*.class</include> + <include>**/java/wordcount/util/WordCountData.class</include> + </includes> + </configuration> + </execution> + + <!-- Distributed Copy --> + <execution> + <id>DistCp</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <classifier>DistCp</classifier> + + <archive> + <manifestEntries> + <program-class>org.apache.flink.examples.java.distcp.DistCp</program-class> + </manifestEntries> + </archive> + + <includes> + <include>**/java/distcp/*</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> + + <!--simplify the name of example JARs for build-target/examples --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <version>1.7</version> + <executions> + <execution> + <id>rename</id> + <phase>package</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <target> + <copy file="${project.basedir}/target/flink-examples-batch-${project.version}-KMeans.jar" tofile="${project.basedir}/target/KMeans.jar" /> + <copy file="${project.basedir}/target/flink-examples-batch-${project.version}-ConnectedComponents.jar" tofile="${project.basedir}/target/ConnectedComponents.jar" /> + <copy file="${project.basedir}/target/flink-examples-batch-${project.version}-EnumerateGraphTriangles.jar" tofile="${project.basedir}/target/EnumerateGraphTriangles.jar" /> + <copy file="${project.basedir}/target/flink-examples-batch-${project.version}-PageRank.jar" tofile="${project.basedir}/target/PageRank.jar" /> + <copy file="${project.basedir}/target/flink-examples-batch-${project.version}-TransitiveClosure.jar" tofile="${project.basedir}/target/TransitiveClosure.jar" /> + <copy file="${project.basedir}/target/flink-examples-batch-${project.version}-WebLogAnalysis.jar" tofile="${project.basedir}/target/WebLogAnalysis.jar" /> + <copy file="${project.basedir}/target/flink-examples-batch-${project.version}-WordCount.jar" tofile="${project.basedir}/target/WordCount.jar" /> + <copy file="${project.basedir}/target/flink-examples-batch-${project.version}-DistCp.jar" tofile="${project.basedir}/target/DistCp.jar" /> + </target> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java new file mode 100644 index 0000000..1730e2a --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.java.clustering; + +import java.io.Serializable; +import java.util.Collection; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.examples.java.clustering.util.KMeansData; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.IterativeDataSet; + +/** + * This example implements a basic K-Means clustering algorithm. + * + * <p> + * K-Means is an iterative clustering algorithm and works as follows:<br> + * K-Means is given a set of data points to be clustered and an initial set of <i>K</i> cluster centers. + * In each iteration, the algorithm computes the distance of each data point to each cluster center. + * Each point is assigned to the cluster center which is closest to it. + * Subsequently, each cluster center is moved to the center (<i>mean</i>) of all points that have been assigned to it. + * The moved cluster centers are fed into the next iteration. + * The algorithm terminates after a fixed number of iterations (as in this implementation) + * or if cluster centers do not (significantly) move in an iteration.<br> + * This is the Wikipedia entry for the <a href="http://en.wikipedia.org/wiki/K-means_clustering">K-Means Clustering algorithm</a>. + * + * <p> + * This implementation works on two-dimensional data points. <br> + * It computes an assignment of data points to cluster centers, i.e., + * each data point is annotated with the id of the final cluster (center) it belongs to. + * + * <p> + * Input files are plain text files and must be formatted as follows: + * <ul> + * <li>Data points are represented as two double values separated by a blank character. + * Data points are separated by newline characters.<br> + * For example <code>"1.2 2.3\n5.3 7.2\n"</code> gives two data points (x=1.2, y=2.3) and (x=5.3, y=7.2). + * <li>Cluster centers are represented by an integer id and a point value.<br> + * For example <code>"1 6.2 3.2\n2 2.9 5.7\n"</code> gives two centers (id=1, x=6.2, y=3.2) and (id=2, x=2.9, y=5.7). + * </ul> + * + * <p> + * Usage: <code>KMeans <points path> <centers path> <result path> <num iterations></code><br> + * If no parameters are provided, the program is run with default data from {@link org.apache.flink.examples.java.clustering.util.KMeansData} and 10 iterations. + * + * <p> + * This example shows how to use: + * <ul> + * <li>Bulk iterations + * <li>Broadcast variables in bulk iterations + * <li>Custom Java objects (PoJos) + * </ul> + */ +@SuppressWarnings("serial") +public class KMeans { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(String[] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + // set up execution environment + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // get input data + DataSet<Point> points = getPointDataSet(env); + DataSet<Centroid> centroids = getCentroidDataSet(env); + + // set number of bulk iterations for KMeans algorithm + IterativeDataSet<Centroid> loop = centroids.iterate(numIterations); + + DataSet<Centroid> newCentroids = points + // compute closest centroid for each point + .map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids") + // count and sum point coordinates for each centroid + .map(new CountAppender()) + .groupBy(0).reduce(new CentroidAccumulator()) + // compute new centroids from point counts and coordinate sums + .map(new CentroidAverager()); + + // feed new centroids back into next iteration + DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids); + + DataSet<Tuple2<Integer, Point>> clusteredPoints = points + // assign points to final clusters + .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids"); + + // emit result + if (fileOutput) { + clusteredPoints.writeAsCsv(outputPath, "\n", " "); + + // since file sinks are lazy, we trigger the execution explicitly + env.execute("KMeans Example"); + } + else { + clusteredPoints.print(); + } + } + + // ************************************************************************* + // DATA TYPES + // ************************************************************************* + + /** + * A simple two-dimensional point. + */ + public static class Point implements Serializable { + + public double x, y; + + public Point() {} + + public Point(double x, double y) { + this.x = x; + this.y = y; + } + + public Point add(Point other) { + x += other.x; + y += other.y; + return this; + } + + public Point div(long val) { + x /= val; + y /= val; + return this; + } + + public double euclideanDistance(Point other) { + return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y)); + } + + public void clear() { + x = y = 0.0; + } + + @Override + public String toString() { + return x + " " + y; + } + } + + /** + * A simple two-dimensional centroid, basically a point with an ID. + */ + public static class Centroid extends Point { + + public int id; + + public Centroid() {} + + public Centroid(int id, double x, double y) { + super(x,y); + this.id = id; + } + + public Centroid(int id, Point p) { + super(p.x, p.y); + this.id = id; + } + + @Override + public String toString() { + return id + " " + super.toString(); + } + } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + /** Converts a {@code Tuple2<Double,Double>} into a Point. */ + @ForwardedFields("0->x; 1->y") + public static final class TuplePointConverter implements MapFunction<Tuple2<Double, Double>, Point> { + + @Override + public Point map(Tuple2<Double, Double> t) throws Exception { + return new Point(t.f0, t.f1); + } + } + + /** Converts a {@code Tuple3<Integer, Double,Double>} into a Centroid. */ + @ForwardedFields("0->id; 1->x; 2->y") + public static final class TupleCentroidConverter implements MapFunction<Tuple3<Integer, Double, Double>, Centroid> { + + @Override + public Centroid map(Tuple3<Integer, Double, Double> t) throws Exception { + return new Centroid(t.f0, t.f1, t.f2); + } + } + + /** Determines the closest cluster center for a data point. */ + @ForwardedFields("*->1") + public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<Integer, Point>> { + private Collection<Centroid> centroids; + + /** Reads the centroid values from a broadcast variable into a collection. */ + @Override + public void open(Configuration parameters) throws Exception { + this.centroids = getRuntimeContext().getBroadcastVariable("centroids"); + } + + @Override + public Tuple2<Integer, Point> map(Point p) throws Exception { + + double minDistance = Double.MAX_VALUE; + int closestCentroidId = -1; + + // check all cluster centers + for (Centroid centroid : centroids) { + // compute distance + double distance = p.euclideanDistance(centroid); + + // update nearest cluster if necessary + if (distance < minDistance) { + minDistance = distance; + closestCentroidId = centroid.id; + } + } + + // emit a new record with the center id and the data point. + return new Tuple2<Integer, Point>(closestCentroidId, p); + } + } + + /** Appends a count variable to the tuple. */ + @ForwardedFields("f0;f1") + public static final class CountAppender implements MapFunction<Tuple2<Integer, Point>, Tuple3<Integer, Point, Long>> { + + @Override + public Tuple3<Integer, Point, Long> map(Tuple2<Integer, Point> t) { + return new Tuple3<Integer, Point, Long>(t.f0, t.f1, 1L); + } + } + + /** Sums and counts point coordinates. */ + @ForwardedFields("0") + public static final class CentroidAccumulator implements ReduceFunction<Tuple3<Integer, Point, Long>> { + + @Override + public Tuple3<Integer, Point, Long> reduce(Tuple3<Integer, Point, Long> val1, Tuple3<Integer, Point, Long> val2) { + return new Tuple3<Integer, Point, Long>(val1.f0, val1.f1.add(val2.f1), val1.f2 + val2.f2); + } + } + + /** Computes new centroid from coordinate sum and count of points. */ + @ForwardedFields("0->id") + public static final class CentroidAverager implements MapFunction<Tuple3<Integer, Point, Long>, Centroid> { + + @Override + public Centroid map(Tuple3<Integer, Point, Long> value) { + return new Centroid(value.f0, value.f1.div(value.f2)); + } + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String pointsPath = null; + private static String centersPath = null; + private static String outputPath = null; + private static int numIterations = 10; + + private static boolean parseParameters(String[] programArguments) { + + if(programArguments.length > 0) { + // parse input arguments + fileOutput = true; + if(programArguments.length == 4) { + pointsPath = programArguments[0]; + centersPath = programArguments[1]; + outputPath = programArguments[2]; + numIterations = Integer.parseInt(programArguments[3]); + } else { + System.err.println("Usage: KMeans <points path> <centers path> <result path> <num iterations>"); + return false; + } + } else { + System.out.println("Executing K-Means example with default parameters and built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println(" We provide a data generator to create synthetic input files for this program."); + System.out.println(" Usage: KMeans <points path> <centers path> <result path> <num iterations>"); + } + return true; + } + + private static DataSet<Point> getPointDataSet(ExecutionEnvironment env) { + if(fileOutput) { + // read points from CSV file + return env.readCsvFile(pointsPath) + .fieldDelimiter(" ") + .includeFields(true, true) + .types(Double.class, Double.class) + .map(new TuplePointConverter()); + } else { + return KMeansData.getDefaultPointDataSet(env); + } + } + + private static DataSet<Centroid> getCentroidDataSet(ExecutionEnvironment env) { + if(fileOutput) { + return env.readCsvFile(centersPath) + .fieldDelimiter(" ") + .includeFields(true, true, true) + .types(Integer.class, Double.class, Double.class) + .map(new TupleCentroidConverter()); + } else { + return KMeansData.getDefaultCentroidDataSet(env); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java new file mode 100644 index 0000000..e165612 --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.java.clustering.util; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.examples.java.clustering.KMeans.Centroid; +import org.apache.flink.examples.java.clustering.KMeans.Point; + +import java.util.LinkedList; +import java.util.List; + +/** + * Provides the default data sets used for the K-Means example program. + * The default data sets are used, if no parameters are given to the program. + * + */ +public class KMeansData { + + // We have the data as object arrays so that we can also generate Scala Data Sources from it. + public static final Object[][] CENTROIDS = new Object[][] { + new Object[] {1, -31.85, -44.77}, + new Object[]{2, 35.16, 17.46}, + new Object[]{3, -5.16, 21.93}, + new Object[]{4, -24.06, 6.81} + }; + + public static final Object[][] POINTS = new Object[][] { + new Object[] {-14.22, -48.01}, + new Object[] {-22.78, 37.10}, + new Object[] {56.18, -42.99}, + new Object[] {35.04, 50.29}, + new Object[] {-9.53, -46.26}, + new Object[] {-34.35, 48.25}, + new Object[] {55.82, -57.49}, + new Object[] {21.03, 54.64}, + new Object[] {-13.63, -42.26}, + new Object[] {-36.57, 32.63}, + new Object[] {50.65, -52.40}, + new Object[] {24.48, 34.04}, + new Object[] {-2.69, -36.02}, + new Object[] {-38.80, 36.58}, + new Object[] {24.00, -53.74}, + new Object[] {32.41, 24.96}, + new Object[] {-4.32, -56.92}, + new Object[] {-22.68, 29.42}, + new Object[] {59.02, -39.56}, + new Object[] {24.47, 45.07}, + new Object[] {5.23, -41.20}, + new Object[] {-23.00, 38.15}, + new Object[] {44.55, -51.50}, + new Object[] {14.62, 59.06}, + new Object[] {7.41, -56.05}, + new Object[] {-26.63, 28.97}, + new Object[] {47.37, -44.72}, + new Object[] {29.07, 51.06}, + new Object[] {0.59, -31.89}, + new Object[] {-39.09, 20.78}, + new Object[] {42.97, -48.98}, + new Object[] {34.36, 49.08}, + new Object[] {-21.91, -49.01}, + new Object[] {-46.68, 46.04}, + new Object[] {48.52, -43.67}, + new Object[] {30.05, 49.25}, + new Object[] {4.03, -43.56}, + new Object[] {-37.85, 41.72}, + new Object[] {38.24, -48.32}, + new Object[] {20.83, 57.85} + }; + + public static DataSet<Centroid> getDefaultCentroidDataSet(ExecutionEnvironment env) { + List<Centroid> centroidList = new LinkedList<Centroid>(); + for (Object[] centroid : CENTROIDS) { + centroidList.add( + new Centroid((Integer) centroid[0], (Double) centroid[1], (Double) centroid[2])); + } + return env.fromCollection(centroidList); + } + + public static DataSet<Point> getDefaultPointDataSet(ExecutionEnvironment env) { + List<Point> pointList = new LinkedList<Point>(); + for (Object[] point : POINTS) { + pointList.add(new Point((Double) point[0], (Double) point[1])); + } + return env.fromCollection(pointList); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java new file mode 100644 index 0000000..8f48d0a --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.examples.java.clustering.util; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.text.DecimalFormat; +import java.util.Locale; +import java.util.Random; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.examples.java.clustering.KMeans; + +/** + * Generates data for the {@link KMeans} example program. + */ +public class KMeansDataGenerator { + + static { + Locale.setDefault(Locale.US); + } + + private static final String CENTERS_FILE = "centers"; + private static final String POINTS_FILE = "points"; + private static final long DEFAULT_SEED = 4650285087650871364L; + private static final double DEFAULT_VALUE_RANGE = 100.0; + private static final double RELATIVE_STDDEV = 0.08; + private static final int DIMENSIONALITY = 2; + private static final DecimalFormat FORMAT = new DecimalFormat("#0.00"); + private static final char DELIMITER = ' '; + + /** + * Main method to generate data for the {@link KMeans} example program. + * <p> + * The generator creates to files: + * <ul> + * <li><code>< output-path >/points</code> for the data points + * <li><code>< output-path >/centers</code> for the cluster centers + * </ul> + * + * @param args + * <ol> + * <li>Int: Number of data points + * <li>Int: Number of cluster centers + * <li><b>Optional</b> String: Output path, default value is {tmp.dir} + * <li><b>Optional</b> Double: Standard deviation of data points + * <li><b>Optional</b> Double: Value range of cluster centers + * <li><b>Optional</b> Long: Random seed + * </ol> + * + * @throws IOException + */ + public static void main(String[] args) throws IOException { + + // check parameter count + if (args.length < 2) { + System.out.println("KMeansDataGenerator -points <num> -k <num clusters> [-output <output-path>] [-stddev <relative stddev>] [-range <centroid range>] [-seed <seed>]"); + System.exit(1); + } + + // parse parameters + + final ParameterTool params = ParameterTool.fromArgs(args); + final int numDataPoints = params.getInt("points"); + final int k = params.getInt("k"); + final String outDir = params.get("output", System.getProperty("java.io.tmpdir")); + final double stddev = params.getDouble("stddev", RELATIVE_STDDEV); + final double range = params.getDouble("range", DEFAULT_VALUE_RANGE); + final long firstSeed = params.getLong("seed", DEFAULT_SEED); + + + final double absoluteStdDev = stddev * range; + final Random random = new Random(firstSeed); + + // the means around which data points are distributed + final double[][] means = uniformRandomCenters(random, k, DIMENSIONALITY, range); + + // write the points out + BufferedWriter pointsOut = null; + try { + pointsOut = new BufferedWriter(new FileWriter(new File(outDir+"/"+POINTS_FILE))); + StringBuilder buffer = new StringBuilder(); + + double[] point = new double[DIMENSIONALITY]; + int nextCentroid = 0; + + for (int i = 1; i <= numDataPoints; i++) { + // generate a point for the current centroid + double[] centroid = means[nextCentroid]; + for (int d = 0; d < DIMENSIONALITY; d++) { + point[d] = (random.nextGaussian() * absoluteStdDev) + centroid[d]; + } + writePoint(point, buffer, pointsOut); + nextCentroid = (nextCentroid + 1) % k; + } + } + finally { + if (pointsOut != null) { + pointsOut.close(); + } + } + + // write the uniformly distributed centers to a file + BufferedWriter centersOut = null; + try { + centersOut = new BufferedWriter(new FileWriter(new File(outDir+"/"+CENTERS_FILE))); + StringBuilder buffer = new StringBuilder(); + + double[][] centers = uniformRandomCenters(random, k, DIMENSIONALITY, range); + + for (int i = 0; i < k; i++) { + writeCenter(i + 1, centers[i], buffer, centersOut); + } + } + finally { + if (centersOut != null) { + centersOut.close(); + } + } + + System.out.println("Wrote "+numDataPoints+" data points to "+outDir+"/"+POINTS_FILE); + System.out.println("Wrote "+k+" cluster centers to "+outDir+"/"+CENTERS_FILE); + } + + private static double[][] uniformRandomCenters(Random rnd, int num, int dimensionality, double range) { + final double halfRange = range / 2; + final double[][] points = new double[num][dimensionality]; + + for (int i = 0; i < num; i++) { + for (int dim = 0; dim < dimensionality; dim ++) { + points[i][dim] = (rnd.nextDouble() * range) - halfRange; + } + } + return points; + } + + private static void writePoint(double[] coordinates, StringBuilder buffer, BufferedWriter out) throws IOException { + buffer.setLength(0); + + // write coordinates + for (int j = 0; j < coordinates.length; j++) { + buffer.append(FORMAT.format(coordinates[j])); + if(j < coordinates.length - 1) { + buffer.append(DELIMITER); + } + } + + out.write(buffer.toString()); + out.newLine(); + } + + private static void writeCenter(long id, double[] coordinates, StringBuilder buffer, BufferedWriter out) throws IOException { + buffer.setLength(0); + + // write id + buffer.append(id); + buffer.append(DELIMITER); + + // write coordinates + for (int j = 0; j < coordinates.length; j++) { + buffer.append(FORMAT.format(coordinates[j])); + if(j < coordinates.length - 1) { + buffer.append(DELIMITER); + } + } + + out.write(buffer.toString()); + out.newLine(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java new file mode 100644 index 0000000..8e87892 --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.java.distcp; + +import org.apache.commons.io.IOUtils; + +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.LocalEnvironment; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.operators.FlatMapOperator; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Collector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * A main class of the Flink distcp utility. + * It's a simple reimplementation of Hadoop distcp + * (see <a href="http://hadoop.apache.org/docs/r1.2.1/distcp.html">http://hadoop.apache.org/docs/r1.2.1/distcp.html</a>) + * with a dynamic input format + * Note that this tool does not deal with retriability. Additionally, empty directories are not copied over. + * <p> + * When running locally, local file systems paths can be used. + * However, in a distributed environment HDFS paths must be provided both as input and output. + */ +public class DistCp { + + private static final Logger LOGGER = LoggerFactory.getLogger(DistCp.class); + public static final String BYTES_COPIED_CNT_NAME = "BYTES_COPIED"; + public static final String FILES_COPIED_CNT_NAME = "FILES_COPIED"; + + public static void main(String[] args) throws Exception { + if (args.length != 3) { + printHelp(); + return; + } + + final Path sourcePath = new Path(args[0]); + final Path targetPath = new Path(args[1]); + int parallelism = Integer.valueOf(args[2], 10); + + // set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + checkInputParams(env, sourcePath, targetPath, parallelism); + env.setParallelism(parallelism); + + long startTime = System.currentTimeMillis(); + LOGGER.info("Initializing copy tasks"); + List<FileCopyTask> tasks = getCopyTasks(sourcePath); + LOGGER.info("Copy task initialization took " + (System.currentTimeMillis() - startTime) + "ms"); + + DataSet<FileCopyTask> inputTasks = new DataSource<>(env, + new FileCopyTaskInputFormat(tasks), + new GenericTypeInfo<>(FileCopyTask.class), "fileCopyTasks"); + + + FlatMapOperator<FileCopyTask, Object> res = inputTasks.flatMap(new RichFlatMapFunction<FileCopyTask, Object>() { + + private LongCounter fileCounter; + private LongCounter bytesCounter; + + @Override + public void open(Configuration parameters) throws Exception { + bytesCounter = getRuntimeContext().getLongCounter(BYTES_COPIED_CNT_NAME); + fileCounter = getRuntimeContext().getLongCounter(FILES_COPIED_CNT_NAME); + } + + @Override + public void flatMap(FileCopyTask task, Collector<Object> out) throws Exception { + LOGGER.info("Processing task: " + task); + Path outPath = new Path(targetPath, task.getRelativePath()); + + FileSystem targetFs = targetPath.getFileSystem(); + // creating parent folders in case of a local FS + if (!targetFs.isDistributedFS()) { + //dealing with cases like file:///tmp or just /tmp + File outFile = outPath.toUri().isAbsolute() ? new File(outPath.toUri()) : new File(outPath.toString()); + File parentFile = outFile.getParentFile(); + if (!parentFile.mkdirs() && !parentFile.exists()) { + throw new RuntimeException("Cannot create local file system directories: " + parentFile); + } + } + FSDataOutputStream outputStream = null; + FSDataInputStream inputStream = null; + try { + outputStream = targetFs.create(outPath, true); + inputStream = task.getPath().getFileSystem().open(task.getPath()); + int bytes = IOUtils.copy(inputStream, outputStream); + bytesCounter.add(bytes); + } finally { + IOUtils.closeQuietly(inputStream); + IOUtils.closeQuietly(outputStream); + } + fileCounter.add(1l); + } + }); + + // no data sinks are needed, therefore just printing an empty result + res.print(); + + Map<String, Object> accumulators = env.getLastJobExecutionResult().getAllAccumulatorResults(); + LOGGER.info("== COUNTERS =="); + for (Map.Entry<String, Object> e : accumulators.entrySet()) { + LOGGER.info(e.getKey() + ": " + e.getValue()); + } + } + + + // ----------------------------------------------------------------------------------------- + // HELPER METHODS + // ----------------------------------------------------------------------------------------- + + private static void checkInputParams(ExecutionEnvironment env, Path sourcePath, Path targetPath, int parallelism) throws IOException { + if (parallelism <= 0) { + throw new IllegalArgumentException("Parallelism should be greater than 0"); + } + + boolean isLocal = env instanceof LocalEnvironment; + if (!isLocal && + !(sourcePath.getFileSystem().isDistributedFS() && targetPath.getFileSystem().isDistributedFS())) { + throw new IllegalArgumentException("In a distributed mode only HDFS input/output paths are supported"); + } + } + + private static void printHelp() { + System.err.println("Usage: <input_path> <output_path> <level_of_parallelism>"); + } + + private static List<FileCopyTask> getCopyTasks(Path sourcePath) throws IOException { + List<FileCopyTask> tasks = new ArrayList<>(); + getCopyTasks(sourcePath, "", tasks); + return tasks; + } + + private static void getCopyTasks(Path p, String rel, List<FileCopyTask> tasks) throws IOException { + FileStatus[] res = p.getFileSystem().listStatus(p); + if (res == null) { + return; + } + for (FileStatus fs : res) { + if (fs.isDir()) { + getCopyTasks(fs.getPath(), rel + fs.getPath().getName() + "/", tasks); + } else { + Path cp = fs.getPath(); + tasks.add(new FileCopyTask(cp, rel + cp.getName())); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java new file mode 100644 index 0000000..7f38a8b --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.java.distcp; + +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.core.fs.Path; + +import java.io.Serializable; + +/** + * A Java POJO that represents a task for copying a single file + */ +public class FileCopyTask implements Serializable { + + private static final long serialVersionUID = -8760082278978316032L; + + private final Path path; + private final String relativePath; + + public FileCopyTask(Path path, String relativePath) { + if (StringUtils.isEmpty(relativePath)) { + throw new IllegalArgumentException("Relative path should not be empty for: " + path); + } + this.path = path; + this.relativePath = relativePath; + } + + public Path getPath() { + return path; + } + + public String getRelativePath() { + return relativePath; + } + + @Override + public String toString() { + return "FileCopyTask{" + + "path=" + path + + ", relativePath='" + relativePath + '\'' + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java new file mode 100644 index 0000000..d6e6713 --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.java.distcp; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +/** + * An implementation of an input format that dynamically assigns {@code FileCopyTask} to the mappers + * that have finished previously assigned tasks + */ +public class FileCopyTaskInputFormat implements InputFormat<FileCopyTask, FileCopyTaskInputSplit> { + + private static final long serialVersionUID = -644394866425221151L; + + private static final Logger LOGGER = LoggerFactory.getLogger(FileCopyTaskInputFormat.class); + + + private final List<FileCopyTask> tasks; + + public FileCopyTaskInputFormat(List<FileCopyTask> tasks) { + this.tasks = tasks; + } + + private class FileCopyTaskAssigner implements InputSplitAssigner { + private Queue<FileCopyTaskInputSplit> splits; + + public FileCopyTaskAssigner(FileCopyTaskInputSplit[] inputSplits) { + splits = new LinkedList<>(Arrays.asList(inputSplits)); + } + + @Override + public InputSplit getNextInputSplit(String host, int taskId) { + LOGGER.info("Getting copy task for task: " + taskId); + return splits.poll(); + } + } + + @Override + public void configure(Configuration parameters) { + //no op + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { + return null; + } + + @Override + public FileCopyTaskInputSplit[] createInputSplits(int minNumSplits) throws IOException { + FileCopyTaskInputSplit[] splits = new FileCopyTaskInputSplit[tasks.size()]; + int i = 0; + for (FileCopyTask t : tasks) { + splits[i] = new FileCopyTaskInputSplit(t, i); + i++; + } + return splits; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(FileCopyTaskInputSplit[] inputSplits) { + return new FileCopyTaskAssigner(inputSplits); + } + + private FileCopyTaskInputSplit curInputSplit = null; + + @Override + public void open(FileCopyTaskInputSplit split) throws IOException { + curInputSplit = split; + } + + @Override + public boolean reachedEnd() throws IOException { + return curInputSplit == null; + } + + @Override + public FileCopyTask nextRecord(FileCopyTask reuse) throws IOException { + FileCopyTask toReturn = curInputSplit.getTask(); + curInputSplit = null; + return toReturn; + } + + @Override + public void close() throws IOException { + //no op + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java new file mode 100644 index 0000000..33943b6 --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.java.distcp; + +import org.apache.flink.core.io.InputSplit; + +/** + * Implementation of {@code InputSplit} for copying files + */ +public class FileCopyTaskInputSplit implements InputSplit { + + private static final long serialVersionUID = -7621656017747660450L; + + private final FileCopyTask task; + private final int splitNumber; + + public FileCopyTaskInputSplit(FileCopyTask task, int splitNumber) { + this.task = task; + this.splitNumber = splitNumber; + } + + public FileCopyTask getTask() { + return task; + } + + @Override + public int getSplitNumber() { + return splitNumber; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java new file mode 100644 index 0000000..ce36504 --- /dev/null +++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.examples.java.graph; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.aggregation.Aggregations; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.examples.java.graph.util.ConnectedComponentsData; +import org.apache.flink.util.Collector; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.DeltaIteration; +import org.apache.flink.api.java.ExecutionEnvironment; + +/** + * An implementation of the connected components algorithm, using a delta iteration. + * + * <p> + * Initially, the algorithm assigns each vertex an unique ID. In each step, a vertex picks the minimum of its own ID and its + * neighbors' IDs, as its new ID and tells its neighbors about its new ID. After the algorithm has completed, all vertices in the + * same component will have the same ID. + * + * <p> + * A vertex whose component ID did not change needs not propagate its information in the next step. Because of that, + * the algorithm is easily expressible via a delta iteration. We here model the solution set as the vertices with + * their current component ids, and the workset as the changed vertices. Because we see all vertices initially as + * changed, the initial workset and the initial solution set are identical. Also, the delta to the solution set + * is consequently also the next workset.<br> + * + * <p> + * Input files are plain text files and must be formatted as follows: + * <ul> + * <li>Vertices represented as IDs and separated by new-line characters.<br> + * For example <code>"1\n2\n12\n42\n63"</code> gives five vertices (1), (2), (12), (42), and (63). + * <li>Edges are represented as pairs for vertex IDs which are separated by space + * characters. Edges are separated by new-line characters.<br> + * For example <code>"1 2\n2 12\n1 12\n42 63"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63). + * </ul> + * + * <p> + * Usage: <code>ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations></code><br> + * If no parameters are provided, the program is run with default data from {@link org.apache.flink.examples.java.graph.util.ConnectedComponentsData} and 10 iterations. + * + * <p> + * This example shows how to use: + * <ul> + * <li>Delta Iterations + * <li>Generic-typed Functions + * </ul> + */ +@SuppressWarnings("serial") +public class ConnectedComponents implements ProgramDescription { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(String... args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + // set up execution environment + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // read vertex and edge data + DataSet<Long> vertices = getVertexDataSet(env); + DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge()); + + // assign the initial components (equal to the vertex id) + DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>()); + + // open a delta iteration + DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = + verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0); + + // apply the step logic: join with the edges, select the minimum neighbor, update if the component of the candidate is smaller + DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset().join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin()) + .groupBy(0).aggregate(Aggregations.MIN, 1) + .join(iteration.getSolutionSet()).where(0).equalTo(0) + .with(new ComponentIdFilter()); + + // close the delta iteration (delta and new workset are identical) + DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes); + + // emit result + if (fileOutput) { + result.writeAsCsv(outputPath, "\n", " "); + // execute program + env.execute("Connected Components Example"); + } else { + result.print(); + } + } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + /** + * Function that turns a value into a 2-tuple where both fields are that value. + */ + @ForwardedFields("*->f0") + public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> { + + @Override + public Tuple2<T, T> map(T vertex) { + return new Tuple2<T, T>(vertex, vertex); + } + } + + /** + * Undirected edges by emitting for each input edge the input edges itself and an inverted version. + */ + public static final class UndirectEdge implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { + Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>(); + + @Override + public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) { + invertedEdge.f0 = edge.f1; + invertedEdge.f1 = edge.f0; + out.collect(edge); + out.collect(invertedEdge); + } + } + + /** + * UDF that joins a (Vertex-ID, Component-ID) pair that represents the current component that + * a vertex is associated with, with a (Source-Vertex-ID, Target-VertexID) edge. The function + * produces a (Target-vertex-ID, Component-ID) pair. + */ + @ForwardedFieldsFirst("f1->f1") + @ForwardedFieldsSecond("f1->f0") + public static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { + + @Override + public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) { + return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1); + } + } + + + + @ForwardedFieldsFirst("*") + public static final class ComponentIdFilter implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { + + @Override + public void join(Tuple2<Long, Long> candidate, Tuple2<Long, Long> old, Collector<Tuple2<Long, Long>> out) { + if (candidate.f1 < old.f1) { + out.collect(candidate); + } + } + } + + + + @Override + public String getDescription() { + return "Parameters: <vertices-path> <edges-path> <result-path> <max-number-of-iterations>"; + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String verticesPath = null; + private static String edgesPath = null; + private static String outputPath = null; + private static int maxIterations = 10; + + private static boolean parseParameters(String[] programArguments) { + + if(programArguments.length > 0) { + // parse input arguments + fileOutput = true; + if(programArguments.length == 4) { + verticesPath = programArguments[0]; + edgesPath = programArguments[1]; + outputPath = programArguments[2]; + maxIterations = Integer.parseInt(programArguments[3]); + } else { + System.err.println("Usage: ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>"); + return false; + } + } else { + System.out.println("Executing Connected Components example with default parameters and built-in default data."); + System.out.println(" Provide parameters to read input data from files."); + System.out.println(" See the documentation for the correct format of input files."); + System.out.println(" Usage: ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>"); + } + return true; + } + + private static DataSet<Long> getVertexDataSet(ExecutionEnvironment env) { + + if(fileOutput) { + return env.readCsvFile(verticesPath).types(Long.class) + .map( + new MapFunction<Tuple1<Long>, Long>() { + public Long map(Tuple1<Long> value) { return value.f0; } + }); + } else { + return ConnectedComponentsData.getDefaultVertexDataSet(env); + } + } + + private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) { + + if(fileOutput) { + return env.readCsvFile(edgesPath).fieldDelimiter(" ").types(Long.class, Long.class); + } else { + return ConnectedComponentsData.getDefaultEdgeDataSet(env); + } + } + + +}