[FLINK-3195] [examples] Consolidate batch examples into one project, unify 
batch and streaming examples under on parent project


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0e1d635
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0e1d635
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0e1d635

Branch: refs/heads/master
Commit: d0e1d635d9e6a4ef157275099e2e787b3b18c0ed
Parents: 62938c1
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Dec 23 20:17:56 2015 +0100
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Thu Jan 14 11:18:58 2016 +0100

----------------------------------------------------------------------
 docs/apis/cli.md                                |  18 +-
 docs/apis/examples.md                           |   4 +-
 docs/setup/gce_setup.md                         |   2 +-
 docs/setup/yarn_setup.md                        |   6 +-
 flink-contrib/flink-storm-examples/pom.xml      |   6 +-
 flink-dist/pom.xml                              |   8 +-
 flink-dist/src/main/assemblies/bin.xml          |  32 +-
 flink-examples/flink-examples-batch/pom.xml     | 400 +++++++++++++
 .../flink/examples/java/clustering/KMeans.java  | 343 ++++++++++++
 .../java/clustering/util/KMeansData.java        | 104 ++++
 .../clustering/util/KMeansDataGenerator.java    | 189 +++++++
 .../flink/examples/java/distcp/DistCp.java      | 182 ++++++
 .../examples/java/distcp/FileCopyTask.java      |  59 ++
 .../java/distcp/FileCopyTaskInputFormat.java    | 116 ++++
 .../java/distcp/FileCopyTaskInputSplit.java     |  46 ++
 .../java/graph/ConnectedComponents.java         | 243 ++++++++
 .../examples/java/graph/EnumTrianglesBasic.java | 231 ++++++++
 .../examples/java/graph/EnumTrianglesOpt.java   | 356 ++++++++++++
 .../flink/examples/java/graph/PageRank.java     | 286 ++++++++++
 .../java/graph/TransitiveClosureNaive.java      | 154 +++++
 .../graph/util/ConnectedComponentsData.java     |  72 +++
 .../java/graph/util/EnumTrianglesData.java      |  58 ++
 .../java/graph/util/EnumTrianglesDataTypes.java | 116 ++++
 .../examples/java/graph/util/PageRankData.java  |  86 +++
 .../java/misc/CollectionExecutionExample.java   |  96 ++++
 .../flink/examples/java/misc/PiEstimation.java  |  99 ++++
 .../examples/java/ml/LinearRegression.java      | 316 +++++++++++
 .../java/ml/util/LinearRegressionData.java      |  71 +++
 .../ml/util/LinearRegressionDataGenerator.java  | 112 ++++
 .../relational/EmptyFieldsCountAccumulator.java | 255 +++++++++
 .../examples/java/relational/TPCHQuery10.java   | 234 ++++++++
 .../examples/java/relational/TPCHQuery3.java    | 271 +++++++++
 .../java/relational/WebLogAnalysis.java         | 325 +++++++++++
 .../java/relational/util/WebLogData.java        | 427 ++++++++++++++
 .../relational/util/WebLogDataGenerator.java    | 210 +++++++
 .../examples/java/wordcount/WordCount.java      | 148 +++++
 .../examples/java/wordcount/WordCountPojo.java  | 173 ++++++
 .../java/wordcount/util/WordCountData.java      |  72 +++
 .../src/main/resources/log4j-test.properties    |  23 +
 .../src/main/resources/log4j.properties         |  23 +
 .../src/main/resources/logback.xml              |  29 +
 .../examples/scala/clustering/KMeans.scala      | 255 +++++++++
 .../scala/graph/ConnectedComponents.scala       | 168 ++++++
 .../examples/scala/graph/DeltaPageRank.scala    | 104 ++++
 .../scala/graph/EnumTrianglesBasic.scala        | 185 ++++++
 .../examples/scala/graph/EnumTrianglesOpt.scala | 253 +++++++++
 .../examples/scala/graph/PageRankBasic.scala    | 210 +++++++
 .../scala/graph/TransitiveClosureNaive.scala    | 116 ++++
 .../examples/scala/misc/PiEstimation.scala      |  52 ++
 .../examples/scala/ml/LinearRegression.scala    | 195 +++++++
 .../examples/scala/relational/TPCHQuery10.scala | 184 ++++++
 .../examples/scala/relational/TPCHQuery3.scala  | 172 ++++++
 .../scala/relational/WebLogAnalysis.scala       | 211 +++++++
 .../examples/scala/wordcount/WordCount.scala    | 101 ++++
 flink-examples/flink-examples-streaming/pom.xml | 559 +++++++++++++++++++
 .../examples/iteration/IterateExample.java      | 246 ++++++++
 .../iteration/util/IterateExampleData.java      |  32 ++
 .../streaming/examples/join/WindowJoin.java     | 296 ++++++++++
 .../examples/join/util/WindowJoinData.java      |  61 ++
 .../ml/IncrementalLearningSkeleton.java         | 254 +++++++++
 .../util/IncrementalLearningSkeletonData.java   |  32 ++
 .../socket/SocketTextStreamWordCount.java       | 105 ++++
 .../examples/twitter/TwitterStream.java         | 164 ++++++
 .../twitter/util/TwitterStreamData.java         |  32 ++
 .../GroupedProcessingTimeWindowExample.java     | 127 +++++
 .../examples/windowing/SessionWindowing.java    | 167 ++++++
 .../examples/windowing/TopSpeedWindowing.java   | 210 +++++++
 .../examples/windowing/WindowWordCount.java     | 132 +++++
 .../windowing/util/SessionWindowingData.java    |  27 +
 .../util/TopSpeedWindowingExampleData.java      | 276 +++++++++
 .../examples/wordcount/PojoExample.java         | 186 ++++++
 .../streaming/examples/wordcount/WordCount.java | 148 +++++
 .../scala/examples/join/WindowJoin.scala        | 156 ++++++
 .../socket/SocketTextStreamWordCount.scala      |  93 +++
 .../examples/windowing/TopSpeedWindowing.scala  | 150 +++++
 .../iteration/IterateExampleITCase.java         |  45 ++
 .../join/WindowJoinITCase.java                  |  50 ++
 .../ml/IncrementalLearningSkeletonITCase.java   |  42 ++
 .../socket/SocketTextStreamWordCountITCase.java |  30 +
 .../twitter/TwitterStreamITCase.java            |  42 ++
 .../windowing/SessionWindowingITCase.java       |  42 ++
 .../TopSpeedWindowingExampleITCase.java         |  45 ++
 .../windowing/WindowWordCountITCase.java        |  50 ++
 .../wordcount/PojoExampleITCase.java            |  45 ++
 .../wordcount/WordCountITCase.java              |  45 ++
 .../join/WindowJoinITCase.java                  |  50 ++
 .../socket/SocketTextStreamWordCountITCase.java |  30 +
 .../TopSpeedWindowingExampleITCase.java         |  45 ++
 flink-examples/flink-java-examples/pom.xml      | 330 -----------
 .../flink/examples/java/clustering/KMeans.java  | 344 ------------
 .../java/clustering/util/KMeansData.java        | 105 ----
 .../clustering/util/KMeansDataGenerator.java    | 189 -------
 .../flink/examples/java/distcp/DistCp.java      | 182 ------
 .../examples/java/distcp/FileCopyTask.java      |  59 --
 .../java/distcp/FileCopyTaskInputFormat.java    | 116 ----
 .../java/distcp/FileCopyTaskInputSplit.java     |  46 --
 .../java/graph/ConnectedComponents.java         | 243 --------
 .../examples/java/graph/EnumTrianglesBasic.java | 231 --------
 .../examples/java/graph/EnumTrianglesOpt.java   | 358 ------------
 .../examples/java/graph/PageRankBasic.java      | 288 ----------
 .../java/graph/TransitiveClosureNaive.java      | 155 -----
 .../graph/util/ConnectedComponentsData.java     |  73 ---
 .../java/graph/util/EnumTrianglesData.java      |  59 --
 .../java/graph/util/EnumTrianglesDataTypes.java | 117 ----
 .../examples/java/graph/util/PageRankData.java  |  87 ---
 .../java/misc/CollectionExecutionExample.java   |  96 ----
 .../flink/examples/java/misc/PiEstimation.java  |  99 ----
 .../examples/java/ml/LinearRegression.java      | 317 -----------
 .../java/ml/util/LinearRegressionData.java      |  72 ---
 .../ml/util/LinearRegressionDataGenerator.java  | 113 ----
 .../relational/EmptyFieldsCountAccumulator.java | 254 ---------
 .../examples/java/relational/TPCHQuery10.java   | 234 --------
 .../examples/java/relational/TPCHQuery3.java    | 272 ---------
 .../java/relational/WebLogAnalysis.java         | 327 -----------
 .../java/relational/util/WebLogData.java        | 428 --------------
 .../relational/util/WebLogDataGenerator.java    | 211 -------
 .../examples/java/wordcount/PojoExample.java    | 171 ------
 .../examples/java/wordcount/WordCount.java      | 148 -----
 .../examples/java/wordcount/WordCountMeta.java  |  54 --
 .../java/wordcount/util/WordCountData.java      |  72 ---
 .../src/main/resources/log4j-test.properties    |  23 -
 .../src/main/resources/log4j.properties         |  23 -
 .../src/main/resources/logback.xml              |  29 -
 flink-examples/flink-scala-examples/pom.xml     | 487 ----------------
 .../org/apache/flink/examples/scala/Dummy.java  |  27 -
 .../examples/scala/clustering/KMeans.scala      | 255 ---------
 .../scala/graph/ConnectedComponents.scala       | 167 ------
 .../examples/scala/graph/DeltaPageRank.scala    | 104 ----
 .../scala/graph/EnumTrianglesBasic.scala        | 185 ------
 .../examples/scala/graph/EnumTrianglesOpt.scala | 253 ---------
 .../examples/scala/graph/PageRankBasic.scala    | 210 -------
 .../scala/graph/TransitiveClosureNaive.scala    | 115 ----
 .../examples/scala/misc/PiEstimation.scala      |  52 --
 .../examples/scala/ml/LinearRegression.scala    | 195 -------
 .../examples/scala/relational/TPCHQuery10.scala | 184 ------
 .../examples/scala/relational/TPCHQuery3.scala  | 172 ------
 .../scala/relational/WebLogAnalysis.scala       | 210 -------
 .../examples/scala/wordcount/WordCount.scala    | 100 ----
 flink-examples/pom.xml                          |   4 +-
 flink-staging/flink-fs-tests/pom.xml            |   2 +-
 flink-staging/flink-table/pom.xml               |   2 +-
 flink-staging/flink-tez/pom.xml                 |   2 +-
 flink-streaming-examples/pom.xml                | 535 ------------------
 .../examples/iteration/IterateExample.java      | 246 --------
 .../iteration/util/IterateExampleData.java      |  32 --
 .../streaming/examples/join/WindowJoin.java     | 296 ----------
 .../examples/join/util/WindowJoinData.java      |  61 --
 .../ml/IncrementalLearningSkeleton.java         | 254 ---------
 .../util/IncrementalLearningSkeletonData.java   |  32 --
 .../socket/SocketTextStreamWordCount.java       | 105 ----
 .../examples/twitter/TwitterStream.java         | 164 ------
 .../twitter/util/TwitterStreamData.java         |  32 --
 .../GroupedProcessingTimeWindowExample.java     | 127 -----
 .../examples/windowing/SessionWindowing.java    | 167 ------
 .../examples/windowing/TopSpeedWindowing.java   | 210 -------
 .../examples/windowing/WindowWordCount.java     | 132 -----
 .../windowing/util/SessionWindowingData.java    |  27 -
 .../util/TopSpeedWindowingExampleData.java      | 276 ---------
 .../examples/wordcount/PojoExample.java         | 186 ------
 .../streaming/examples/wordcount/WordCount.java | 148 -----
 .../scala/examples/join/WindowJoin.scala        | 156 ------
 .../socket/SocketTextStreamWordCount.scala      |  93 ---
 .../examples/windowing/TopSpeedWindowing.scala  | 150 -----
 .../iteration/IterateExampleITCase.java         |  45 --
 .../join/WindowJoinITCase.java                  |  50 --
 .../ml/IncrementalLearningSkeletonITCase.java   |  42 --
 .../socket/SocketTextStreamWordCountITCase.java |  30 -
 .../twitter/TwitterStreamITCase.java            |  42 --
 .../windowing/SessionWindowingITCase.java       |  42 --
 .../TopSpeedWindowingExampleITCase.java         |  45 --
 .../windowing/WindowWordCountITCase.java        |  50 --
 .../wordcount/PojoExampleITCase.java            |  45 --
 .../wordcount/WordCountITCase.java              |  45 --
 .../join/WindowJoinITCase.java                  |  50 --
 .../socket/SocketTextStreamWordCountITCase.java |  30 -
 .../TopSpeedWindowingExampleITCase.java         |  45 --
 flink-tests/pom.xml                             |  12 +-
 .../exampleJavaPrograms/PageRankITCase.java     |   8 +-
 .../iterations/PageRankCompilerTest.java        |  10 +-
 .../jsonplan/DumpCompiledPlanTest.java          |   4 +-
 .../optimizer/jsonplan/PreviewPlanDumpTest.java |   4 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |  13 +-
 pom.xml                                         |   1 -
 183 files changed, 12215 insertions(+), 12699 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/docs/apis/cli.md
----------------------------------------------------------------------
diff --git a/docs/apis/cli.md b/docs/apis/cli.md
index 78ea4b6..1150123 100644
--- a/docs/apis/cli.md
+++ b/docs/apis/cli.md
@@ -46,47 +46,47 @@ The command line can be used to
 
 -   Run example program with no arguments.
 
-        ./bin/flink run ./examples/WordCount.jar
+        ./bin/flink run ./examples/batch/WordCount.jar
 
 -   Run example program with arguments for input and result files
 
-        ./bin/flink run ./examples/WordCount.jar \
+        ./bin/flink run ./examples/batch/WordCount.jar \
                                file:///home/user/hamlet.txt 
file:///home/user/wordcount_out
 
 -   Run example program with parallelism 16 and arguments for input and result 
files
 
-        ./bin/flink run -p 16 ./examples/WordCount.jar \
+        ./bin/flink run -p 16 ./examples/batch/WordCount.jar \
                                 file:///home/user/hamlet.txt 
file:///home/user/wordcount_out
 
 -   Run example program with flink log output disabled
 
-            ./bin/flink run -q ./examples/WordCount.jar
+            ./bin/flink run -q ./examples/batch/WordCount.jar
 
 -   Run example program in detached mode
 
-            ./bin/flink run -d ./examples/WordCount.jar
+            ./bin/flink run -d ./examples/batch/WordCount.jar
 
 -   Run example program on a specific JobManager:
 
         ./bin/flink run -m myJMHost:6123 \
-                               ./examples/WordCount.jar \
+                               ./examples/batch/WordCount.jar \
                                file:///home/user/hamlet.txt 
file:///home/user/wordcount_out
 
 -   Run example program with a specific class as an entry point:
 
         ./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount \
-                               ./examples/WordCount.jar \
+                               ./examples/batch/WordCount.jar \
                                file:///home/user/hamlet.txt 
file:///home/user/wordcount_out
 
 -   Run example program using a [per-job YARN 
cluster]({{site.baseurl}}/setup/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn)
 with 2 TaskManagers:
 
         ./bin/flink run -m yarn-cluster -yn 2 \
-                               ./examples/WordCount.jar \
+                               ./examples/batch/WordCount.jar \
                                hdfs:///user/hamlet.txt 
hdfs:///user/wordcount_out
 
 -   Display the optimized execution plan for the WordCount example program as 
JSON:
 
-        ./bin/flink info ./examples/WordCount.jar \
+        ./bin/flink info ./examples/batch/WordCount.jar \
                                 file:///home/user/hamlet.txt 
file:///home/user/wordcount_out
 
 -   List scheduled and running jobs (including their JobIDs):

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/docs/apis/examples.md
----------------------------------------------------------------------
diff --git a/docs/apis/examples.md b/docs/apis/examples.md
index d22b436..a11ed9c 100644
--- a/docs/apis/examples.md
+++ b/docs/apis/examples.md
@@ -42,7 +42,7 @@ Each binary release of Flink contains an `examples` directory 
with jar files for
 To run the WordCount example, issue the following command:
 
 ~~~bash
-./bin/flink run ./examples/WordCount.jar
+./bin/flink run ./examples/batch/WordCount.jar
 ~~~
 
 The other examples can be started in a similar way.
@@ -50,7 +50,7 @@ The other examples can be started in a similar way.
 Note that many examples run without passing any arguments for them, by using 
build-in data. To run WordCount with real data, you have to pass the path to 
the data:
 
 ~~~bash
-./bin/flink run ./examples/WordCount.jar /path/to/some/text/data 
/path/to/result
+./bin/flink run ./examples/batch/WordCount.jar /path/to/some/text/data 
/path/to/result
 ~~~
 
 Note that non-local file systems require a schema prefix, such as `hdfs://`.

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/docs/setup/gce_setup.md
----------------------------------------------------------------------
diff --git a/docs/setup/gce_setup.md b/docs/setup/gce_setup.md
index f6499dc..4f3996a 100644
--- a/docs/setup/gce_setup.md
+++ b/docs/setup/gce_setup.md
@@ -95,7 +95,7 @@ To bring up the Flink cluster on Google Compute Engine, 
execute:
 
     ./bdutil shell
     cd /home/hadoop/flink-install/bin
-    ./flink run ../examples/WordCount.jar 
gs://dataflow-samples/shakespeare/othello.txt gs://<bucket_name>/output
+    ./flink run ../examples/batch/WordCount.jar 
gs://dataflow-samples/shakespeare/othello.txt gs://<bucket_name>/output
 
 ## Shut down your cluster
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/docs/setup/yarn_setup.md
----------------------------------------------------------------------
diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md
index a7309e4..7a00af5 100644
--- a/docs/setup/yarn_setup.md
+++ b/docs/setup/yarn_setup.md
@@ -48,7 +48,7 @@ Once the session has been started, you can submit jobs to the 
cluster using the
 curl -O <flink_hadoop2_download_url>
 tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz
 cd flink-{{ site.version }}/
-./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 
./examples/WordCount.jar
+./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 
./examples/batch/WordCount.jar
 ~~~
 
 ## Apache Flink on Hadoop YARN using a YARN Session
@@ -179,7 +179,7 @@ Use the *run* action to submit a job to YARN. The client is 
able to determine th
 ~~~bash
 wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txt
 hadoop fs -copyFromLocal LICENSE-2.0.txt hdfs:/// ...
-./bin/flink run ./examples/WordCount.jar \
+./bin/flink run ./examples/batch/WordCount.jar \
         hdfs:///..../LICENSE-2.0.txt hdfs:///.../wordcount-result.txt
 ~~~
 
@@ -205,7 +205,7 @@ Please note that the client then expects the `-yn` value to 
be set (number of Ta
 ***Example:***
 
 ~~~bash
-./bin/flink run -m yarn-cluster -yn 2 ./examples/WordCount.jar
+./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
 ~~~
 
 The command line options of the YARN session are also available with the 
`./bin/flink` tool.

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-contrib/flink-storm-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/pom.xml 
b/flink-contrib/flink-storm-examples/pom.xml
index 6f3a050..7e093ff 100644
--- a/flink-contrib/flink-storm-examples/pom.xml
+++ b/flink-contrib/flink-storm-examples/pom.xml
@@ -43,7 +43,7 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-java-examples</artifactId>
+                       <artifactId>flink-examples-batch</artifactId>
                        <version>${project.version}</version>
                </dependency>
 
@@ -73,7 +73,7 @@ under the License.
 
        <build>
                <plugins>
-                       <!-- get default data from flink-java-examples package 
-->
+                       <!-- get default data from flink-example-batch package 
-->
                        <plugin>
                                <groupId>org.apache.maven.plugins</groupId>
                                <artifactId>maven-dependency-plugin</artifactId>
@@ -89,7 +89,7 @@ under the License.
                                                        <artifactItems>
                                                                <artifactItem>
                                                                        
<groupId>org.apache.flink</groupId>
-                                                                       
<artifactId>flink-java-examples</artifactId>
+                                                                       
<artifactId>flink-examples-batch</artifactId>
                                                                        
<version>${project.version}</version>
                                                                        
<type>jar</type>
                                                                        
<overWrite>false</overWrite>

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index e6b2fe0..543652f 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -85,13 +85,7 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-java-examples</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-scala-examples</artifactId>
+                       <artifactId>flink-examples-batch</artifactId>
                        <version>${project.version}</version>
                </dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml 
b/flink-dist/src/main/assemblies/bin.xml
index 602af68..b067280 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -103,7 +103,7 @@ under the License.
                <!-- copy *.txt files -->
                <fileSet>
                        <directory>src/main/flink-bin/</directory>
-                       <outputDirectory></outputDirectory>
+                       <outputDirectory/>
                        <fileMode>0644</fileMode>
                        <includes>
                                <include>*.txt</include>
@@ -113,7 +113,7 @@ under the License.
                <!-- copy LICENSE/NOTICE files -->
                <fileSet>
                        <directory>../</directory>
-                       <outputDirectory></outputDirectory>
+                       <outputDirectory/>
                        <fileMode>0644</fileMode>
                        <includes>
                                <include>LICENSE*</include>
@@ -150,21 +150,31 @@ under the License.
                        </excludes>
                </fileSet>
 
-               <!-- copy jar files of java examples -->
+               <!-- copy jar files of the batch examples -->
                <fileSet>
-                       
<directory>../flink-examples/flink-java-examples/target</directory>
-                       <outputDirectory>examples</outputDirectory>
+                       
<directory>../flink-examples/flink-examples-batch/target</directory>
+                       <outputDirectory>examples/batch</outputDirectory>
                        <fileMode>0644</fileMode>
                        <includes>
                                <include>*.jar</include>
                        </includes>
                        <excludes>
-                               
<exclude>flink-java-examples*-${project.version}.jar</exclude>
-                               
<exclude>original-flink-java-examples*-${project.version}.jar</exclude>
-                               
<exclude>flink-java-examples*-${project.version}-sources.jar</exclude>
-                               
<exclude>flink-java-examples*-${project.version}-tests.jar</exclude>
-                               
<exclude>flink-java-examples*-${project.version}-javadoc.jar</exclude>
-                               
<exclude>flink-java-examples*-${project.version}-*.jar</exclude>
+                               <exclude>flink-examples-batch*.jar</exclude>
+                               
<exclude>original-flink-examples-batch*.jar</exclude>
+                       </excludes>
+               </fileSet>
+
+               <!-- copy jar files of the streaming examples -->
+               <fileSet>
+                       
<directory>../flink-examples/flink-examples-streaming/target</directory>
+                       <outputDirectory>examples/streaming</outputDirectory>
+                       <fileMode>0644</fileMode>
+                       <includes>
+                               <include>*.jar</include>
+                       </includes>
+                       <excludes>
+                               <exclude>flink-examples-streaming*.jar</exclude>
+                               
<exclude>original-flink-examples-streaming*.jar</exclude>
                        </excludes>
                </fileSet>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/pom.xml 
b/flink-examples/flink-examples-batch/pom.xml
new file mode 100644
index 0000000..a989ef5
--- /dev/null
+++ b/flink-examples/flink-examples-batch/pom.xml
@@ -0,0 +1,400 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-examples</artifactId>
+               <version>1.0-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-examples-batch</artifactId>
+       <name>flink-examples-batch</name>
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-java</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-scala</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+       </dependencies>
+       
+       
+       <build>
+               <plugins>
+
+                       <!-- Scala Compiler -->
+                       <plugin>
+                               <groupId>net.alchim31.maven</groupId>
+                               <artifactId>scala-maven-plugin</artifactId>
+                               <version>3.1.4</version>
+                               <executions>
+                                       <!-- Run scala compiler in the 
process-resources phase, so that dependencies on
+                                               scala classes can be resolved 
later in the (Java) compile phase -->
+                                       <execution>
+                                               <id>scala-compile-first</id>
+                                               <phase>process-resources</phase>
+                                               <goals>
+                                                       <goal>compile</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                               <configuration>
+                                       <jvmArgs>
+                                               <jvmArg>-Xms128m</jvmArg>
+                                               <jvmArg>-Xmx512m</jvmArg>
+                                       </jvmArgs>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Eclipse Integration -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-eclipse-plugin</artifactId>
+                               <version>2.8</version>
+                               <configuration>
+                                       <downloadSources>true</downloadSources>
+                                       <projectnatures>
+                                               
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+                                               
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+                                       </projectnatures>
+                                       <buildcommands>
+                                               
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+                                       </buildcommands>
+                                       <classpathContainers>
+                                               
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+                                               
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+                                       </classpathContainers>
+                                       <excludes>
+                                               
<exclude>org.scala-lang:scala-library</exclude>
+                                               
<exclude>org.scala-lang:scala-compiler</exclude>
+                                       </excludes>
+                                       <sourceIncludes>
+                                               
<sourceInclude>**/*.scala</sourceInclude>
+                                               
<sourceInclude>**/*.java</sourceInclude>
+                                       </sourceIncludes>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Adding scala source directories to build path -->
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               
<artifactId>build-helper-maven-plugin</artifactId>
+                               <version>1.7</version>
+                               <executions>
+                                       <!-- Add src/main/scala to eclipse 
build path -->
+                                       <execution>
+                                               <id>add-source</id>
+                                               <phase>generate-sources</phase>
+                                               <goals>
+                                                       <goal>add-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>src/main/scala</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                                       <!-- Add src/test/scala to eclipse 
build path -->
+                                       <execution>
+                                               <id>add-test-source</id>
+                                               
<phase>generate-test-sources</phase>
+                                               <goals>
+                                                       
<goal>add-test-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>src/test/scala</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <!-- Scala Code Style -->
+                       <plugin>
+                               <groupId>org.scalastyle</groupId>
+                               <artifactId>scalastyle-maven-plugin</artifactId>
+                               <version>0.5.0</version>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>check</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                               <configuration>
+                                       <verbose>false</verbose>
+                                       <failOnViolation>true</failOnViolation>
+                                       
<includeTestSourceDirectory>true</includeTestSourceDirectory>
+                                       <failOnWarning>false</failOnWarning>
+                                       
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+                                       
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+                                       
<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+                                       
<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+                                       <outputEncoding>UTF-8</outputEncoding>
+                               </configuration>
+                       </plugin>
+                       
+                       <!-- create the exampe JAR files -->
+                       
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                               
+                                       <!-- KMeans -->
+                                       <execution>
+                                               <id>KMeans</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>jar</goal>
+                                               </goals>
+
+                                               <configuration>
+                                                       
<classifier>KMeans</classifier>
+
+                                                       <archive>
+                                                               
<manifestEntries>
+                                                                       
<program-class>org.apache.flink.examples.java.clustering.KMeans</program-class>
+                                                               
</manifestEntries>
+                                                       </archive>
+
+                                                       <includes>
+                                                               
<include>**/java/clustering/KMeans.class</include>
+                                                               
<include>**/java/clustering/KMeans$*.class</include>
+                                                               
<include>**/java/clustering/util/KMeansDataGenerator.class</include>
+                                                               
<include>**/java/clustering/util/KMeansData.class</include>
+                                                       </includes>
+                                               </configuration>
+                                       </execution>
+
+                                       <!-- Transitive Closure -->
+                                       <execution>
+                                               <id>TransitiveClosure</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>jar</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<classifier>TransitiveClosure</classifier>
+                               
+                                                       <archive>
+                                                               
<manifestEntries>
+                                                                       
<program-class>org.apache.flink.examples.java.graph.TransitiveClosureNaive</program-class>
+                                                               
</manifestEntries>
+                                                       </archive>
+                               
+                                                       <includes>
+                                                               
<include>**/java/graph/TransitiveClosureNaive.class</include>
+                                                               
<include>**/java/graph/TransitiveClosureNaive$*.class</include>
+                                                               
<include>**/java/graph/util/ConnectedComponentsData.class</include>
+                                                       </includes>
+                                               </configuration>
+                                       </execution>
+
+                                       <!-- Connected Components -->
+                                       <execution>
+                                               <id>ConnectedComponents</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>jar</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<classifier>ConnectedComponents</classifier>
+
+                                                       <archive>
+                                                               
<manifestEntries>
+                                                                       
<program-class>org.apache.flink.examples.java.graph.ConnectedComponents</program-class>
+                                                               
</manifestEntries>
+                                                       </archive>
+
+                                                       <includes>
+                                                               
<include>**/java/graph/ConnectedComponents.class</include>
+                                                               
<include>**/java/graph/ConnectedComponents$*.class</include>
+                                                               
<include>**/java/graph/util/ConnectedComponentsData.class</include>
+                                                       </includes>
+                                               </configuration>
+                                       </execution>
+                                       
+                                       <!-- EnumTriangles Basic -->
+                                       <execution>
+                                               <id>EnumerateGraphTriangles</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>jar</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<classifier>EnumerateGraphTriangles</classifier>
+
+                                                       <archive>
+                                                               
<manifestEntries>
+                                                                       
<program-class>org.apache.flink.examples.java.graph.EnumTrianglesBasic</program-class>
+                                                               
</manifestEntries>
+                                                       </archive>
+
+                                                       <includes>
+                                                               
<include>**/java/graph/EnumTrianglesBasic.class</include>
+                                                               
<include>**/java/graph/EnumTrianglesBasic$*.class</include>
+                                                               
<include>**/java/graph/util/EnumTrianglesDataTypes.class</include>
+                                                               
<include>**/java/graph/util/EnumTrianglesDataTypes$*.class</include>
+                                                               
<include>**/java/graph/util/EnumTrianglesData.class</include>
+                                                       </includes>
+                                               </configuration>
+                                       </execution>
+                                       
+                                       <!-- PageRank -->
+                                       <execution>
+                                               <id>PageRank</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>jar</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<classifier>PageRank</classifier>
+
+                                                       <archive>
+                                                               
<manifestEntries>
+                                                                       
<program-class>org.apache.flink.examples.java.graph.PageRank</program-class>
+                                                               
</manifestEntries>
+                                                       </archive>
+
+                                                       <includes>
+                                                               
<include>**/java/graph/PageRank.class</include>
+                                                               
<include>**/java/graph/PageRank$*.class</include>
+                                                               
<include>**/java/graph/util/PageRankData.class</include>
+                                                       </includes>
+                                               </configuration>
+                                       </execution>
+
+                                       <!-- WebLogAnalysis -->
+                                       <execution>
+                                               <id>WebLogAnalysis</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>jar</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<classifier>WebLogAnalysis</classifier>
+
+                                                       <archive>
+                                                               
<manifestEntries>
+                                                                       
<program-class>org.apache.flink.examples.java.relational.WebLogAnalysis</program-class>
+                                                               
</manifestEntries>
+                                                       </archive>
+
+                                                       <includes>
+                                                               
<include>**/java/relational/WebLogAnalysis.class</include>
+                                                               
<include>**/java/relational/WebLogAnalysis$*.class</include>
+                                                               
<include>**/java/relational/util/WebLogData.class</include>
+                                                               
<include>**/java/relational/util/WebLogDataGenerator.class</include>
+                                                       </includes>
+                                               </configuration>
+                                       </execution>
+
+                                       <!-- WordCount -->
+                                       <execution>
+                                               <id>WordCount</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>jar</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<classifier>WordCount</classifier>
+
+                                                       <archive>
+                                                               
<manifestEntries>
+                                                                       
<program-class>org.apache.flink.examples.java.wordcount.WordCount</program-class>
+                                                               
</manifestEntries>
+                                                       </archive>
+
+                                                       <includes>
+                                                               
<include>**/java/wordcount/WordCount.class</include>
+                                                               
<include>**/java/wordcount/WordCount$*.class</include>
+                                                               
<include>**/java/wordcount/util/WordCountData.class</include>
+                                                       </includes>
+                                               </configuration>
+                                       </execution>
+
+                                       <!-- Distributed Copy -->
+                                       <execution>
+                                               <id>DistCp</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>jar</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<classifier>DistCp</classifier>
+
+                                                       <archive>
+                                                               
<manifestEntries>
+                                                                       
<program-class>org.apache.flink.examples.java.distcp.DistCp</program-class>
+                                                               
</manifestEntries>
+                                                       </archive>
+
+                                                       <includes>
+                                                               
<include>**/java/distcp/*</include>
+                                                       </includes>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <!--simplify the name of example JARs for 
build-target/examples -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-antrun-plugin</artifactId>
+                               <version>1.7</version>
+                               <executions>
+                                       <execution>
+                                               <id>rename</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>run</goal>
+                                               </goals>
+                                               <configuration> 
+                                                       <target>
+                                                               <copy 
file="${project.basedir}/target/flink-examples-batch-${project.version}-KMeans.jar"
 tofile="${project.basedir}/target/KMeans.jar" />
+                                                               <copy 
file="${project.basedir}/target/flink-examples-batch-${project.version}-ConnectedComponents.jar"
 tofile="${project.basedir}/target/ConnectedComponents.jar" />
+                                                               <copy 
file="${project.basedir}/target/flink-examples-batch-${project.version}-EnumerateGraphTriangles.jar"
 tofile="${project.basedir}/target/EnumerateGraphTriangles.jar" />
+                                                               <copy 
file="${project.basedir}/target/flink-examples-batch-${project.version}-PageRank.jar"
 tofile="${project.basedir}/target/PageRank.jar" />
+                                                               <copy 
file="${project.basedir}/target/flink-examples-batch-${project.version}-TransitiveClosure.jar"
 tofile="${project.basedir}/target/TransitiveClosure.jar" />
+                                                               <copy 
file="${project.basedir}/target/flink-examples-batch-${project.version}-WebLogAnalysis.jar"
 tofile="${project.basedir}/target/WebLogAnalysis.jar" />
+                                                               <copy 
file="${project.basedir}/target/flink-examples-batch-${project.version}-WordCount.jar"
 tofile="${project.basedir}/target/WordCount.jar" />
+                                                               <copy 
file="${project.basedir}/target/flink-examples-batch-${project.version}-DistCp.jar"
 tofile="${project.basedir}/target/DistCp.jar" />
+                                                       </target>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
new file mode 100644
index 0000000..1730e2a
--- /dev/null
+++ 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.clustering;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.examples.java.clustering.util.KMeansData;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+
+/**
+ * This example implements a basic K-Means clustering algorithm.
+ * 
+ * <p>
+ * K-Means is an iterative clustering algorithm and works as follows:<br>
+ * K-Means is given a set of data points to be clustered and an initial set of 
<i>K</i> cluster centers.
+ * In each iteration, the algorithm computes the distance of each data point 
to each cluster center.
+ * Each point is assigned to the cluster center which is closest to it.
+ * Subsequently, each cluster center is moved to the center (<i>mean</i>) of 
all points that have been assigned to it.
+ * The moved cluster centers are fed into the next iteration. 
+ * The algorithm terminates after a fixed number of iterations (as in this 
implementation) 
+ * or if cluster centers do not (significantly) move in an iteration.<br>
+ * This is the Wikipedia entry for the <a 
href="http://en.wikipedia.org/wiki/K-means_clustering";>K-Means Clustering 
algorithm</a>.
+ * 
+ * <p>
+ * This implementation works on two-dimensional data points. <br>
+ * It computes an assignment of data points to cluster centers, i.e., 
+ * each data point is annotated with the id of the final cluster (center) it 
belongs to.
+ * 
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * <li>Data points are represented as two double values separated by a blank 
character.
+ * Data points are separated by newline characters.<br>
+ * For example <code>"1.2 2.3\n5.3 7.2\n"</code> gives two data points (x=1.2, 
y=2.3) and (x=5.3, y=7.2).
+ * <li>Cluster centers are represented by an integer id and a point value.<br>
+ * For example <code>"1 6.2 3.2\n2 2.9 5.7\n"</code> gives two centers (id=1, 
x=6.2, y=3.2) and (id=2, x=2.9, y=5.7).
+ * </ul>
+ * 
+ * <p>
+ * Usage: <code>KMeans &lt;points path&gt; &lt;centers path&gt; &lt;result 
path&gt; &lt;num iterations&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from 
{@link org.apache.flink.examples.java.clustering.util.KMeansData} and 10 
iterations. 
+ * 
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>Bulk iterations
+ * <li>Broadcast variables in bulk iterations
+ * <li>Custom Java objects (PoJos)
+ * </ul>
+ */
+@SuppressWarnings("serial")
+public class KMeans {
+       
+       // 
*************************************************************************
+       //     PROGRAM
+       // 
*************************************************************************
+       
+       public static void main(String[] args) throws Exception {
+               
+               if(!parseParameters(args)) {
+                       return;
+               }
+       
+               // set up execution environment
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               // get input data
+               DataSet<Point> points = getPointDataSet(env);
+               DataSet<Centroid> centroids = getCentroidDataSet(env);
+               
+               // set number of bulk iterations for KMeans algorithm
+               IterativeDataSet<Centroid> loop = 
centroids.iterate(numIterations);
+               
+               DataSet<Centroid> newCentroids = points
+                       // compute closest centroid for each point
+                       .map(new SelectNearestCenter()).withBroadcastSet(loop, 
"centroids")
+                       // count and sum point coordinates for each centroid
+                       .map(new CountAppender())
+                       .groupBy(0).reduce(new CentroidAccumulator())
+                       // compute new centroids from point counts and 
coordinate sums
+                       .map(new CentroidAverager());
+               
+               // feed new centroids back into next iteration
+               DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
+               
+               DataSet<Tuple2<Integer, Point>> clusteredPoints = points
+                               // assign points to final clusters
+                               .map(new 
SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
+               
+               // emit result
+               if (fileOutput) {
+                       clusteredPoints.writeAsCsv(outputPath, "\n", " ");
+
+                       // since file sinks are lazy, we trigger the execution 
explicitly
+                       env.execute("KMeans Example");
+               }
+               else {
+                       clusteredPoints.print();
+               }
+       }
+       
+       // 
*************************************************************************
+       //     DATA TYPES
+       // 
*************************************************************************
+       
+       /**
+        * A simple two-dimensional point.
+        */
+       public static class Point implements Serializable {
+               
+               public double x, y;
+               
+               public Point() {}
+
+               public Point(double x, double y) {
+                       this.x = x;
+                       this.y = y;
+               }
+               
+               public Point add(Point other) {
+                       x += other.x;
+                       y += other.y;
+                       return this;
+               }
+               
+               public Point div(long val) {
+                       x /= val;
+                       y /= val;
+                       return this;
+               }
+               
+               public double euclideanDistance(Point other) {
+                       return Math.sqrt((x-other.x)*(x-other.x) + 
(y-other.y)*(y-other.y));
+               }
+               
+               public void clear() {
+                       x = y = 0.0;
+               }
+               
+               @Override
+               public String toString() {
+                       return x + " " + y;
+               }
+       }
+       
+       /**
+        * A simple two-dimensional centroid, basically a point with an ID. 
+        */
+       public static class Centroid extends Point {
+               
+               public int id;
+               
+               public Centroid() {}
+               
+               public Centroid(int id, double x, double y) {
+                       super(x,y);
+                       this.id = id;
+               }
+               
+               public Centroid(int id, Point p) {
+                       super(p.x, p.y);
+                       this.id = id;
+               }
+               
+               @Override
+               public String toString() {
+                       return id + " " + super.toString();
+               }
+       }
+       
+       // 
*************************************************************************
+       //     USER FUNCTIONS
+       // 
*************************************************************************
+       
+       /** Converts a {@code Tuple2<Double,Double>} into a Point. */
+       @ForwardedFields("0->x; 1->y")
+       public static final class TuplePointConverter implements 
MapFunction<Tuple2<Double, Double>, Point> {
+
+               @Override
+               public Point map(Tuple2<Double, Double> t) throws Exception {
+                       return new Point(t.f0, t.f1);
+               }
+       }
+       
+       /** Converts a {@code Tuple3<Integer, Double,Double>} into a Centroid. 
*/
+       @ForwardedFields("0->id; 1->x; 2->y")
+       public static final class TupleCentroidConverter implements 
MapFunction<Tuple3<Integer, Double, Double>, Centroid> {
+
+               @Override
+               public Centroid map(Tuple3<Integer, Double, Double> t) throws 
Exception {
+                       return new Centroid(t.f0, t.f1, t.f2);
+               }
+       }
+       
+       /** Determines the closest cluster center for a data point. */
+       @ForwardedFields("*->1")
+       public static final class SelectNearestCenter extends 
RichMapFunction<Point, Tuple2<Integer, Point>> {
+               private Collection<Centroid> centroids;
+
+               /** Reads the centroid values from a broadcast variable into a 
collection. */
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       this.centroids = 
getRuntimeContext().getBroadcastVariable("centroids");
+               }
+               
+               @Override
+               public Tuple2<Integer, Point> map(Point p) throws Exception {
+                       
+                       double minDistance = Double.MAX_VALUE;
+                       int closestCentroidId = -1;
+                       
+                       // check all cluster centers
+                       for (Centroid centroid : centroids) {
+                               // compute distance
+                               double distance = p.euclideanDistance(centroid);
+                               
+                               // update nearest cluster if necessary 
+                               if (distance < minDistance) {
+                                       minDistance = distance;
+                                       closestCentroidId = centroid.id;
+                               }
+                       }
+
+                       // emit a new record with the center id and the data 
point.
+                       return new Tuple2<Integer, Point>(closestCentroidId, p);
+               }
+       }
+       
+       /** Appends a count variable to the tuple. */
+       @ForwardedFields("f0;f1")
+       public static final class CountAppender implements 
MapFunction<Tuple2<Integer, Point>, Tuple3<Integer, Point, Long>> {
+
+               @Override
+               public Tuple3<Integer, Point, Long> map(Tuple2<Integer, Point> 
t) {
+                       return new Tuple3<Integer, Point, Long>(t.f0, t.f1, 1L);
+               } 
+       }
+       
+       /** Sums and counts point coordinates. */
+       @ForwardedFields("0")
+       public static final class CentroidAccumulator implements 
ReduceFunction<Tuple3<Integer, Point, Long>> {
+
+               @Override
+               public Tuple3<Integer, Point, Long> reduce(Tuple3<Integer, 
Point, Long> val1, Tuple3<Integer, Point, Long> val2) {
+                       return new Tuple3<Integer, Point, Long>(val1.f0, 
val1.f1.add(val2.f1), val1.f2 + val2.f2);
+               }
+       }
+       
+       /** Computes new centroid from coordinate sum and count of points. */
+       @ForwardedFields("0->id")
+       public static final class CentroidAverager implements 
MapFunction<Tuple3<Integer, Point, Long>, Centroid> {
+
+               @Override
+               public Centroid map(Tuple3<Integer, Point, Long> value) {
+                       return new Centroid(value.f0, value.f1.div(value.f2));
+               }
+       }
+       
+       // 
*************************************************************************
+       //     UTIL METHODS
+       // 
*************************************************************************
+       
+       private static boolean fileOutput = false;
+       private static String pointsPath = null;
+       private static String centersPath = null;
+       private static String outputPath = null;
+       private static int numIterations = 10;
+       
+       private static boolean parseParameters(String[] programArguments) {
+               
+               if(programArguments.length > 0) {
+                       // parse input arguments
+                       fileOutput = true;
+                       if(programArguments.length == 4) {
+                               pointsPath = programArguments[0];
+                               centersPath = programArguments[1];
+                               outputPath = programArguments[2];
+                               numIterations = 
Integer.parseInt(programArguments[3]);
+                       } else {
+                               System.err.println("Usage: KMeans <points path> 
<centers path> <result path> <num iterations>");
+                               return false;
+                       }
+               } else {
+                       System.out.println("Executing K-Means example with 
default parameters and built-in default data.");
+                       System.out.println("  Provide parameters to read input 
data from files.");
+                       System.out.println("  See the documentation for the 
correct format of input files.");
+                       System.out.println("  We provide a data generator to 
create synthetic input files for this program.");
+                       System.out.println("  Usage: KMeans <points path> 
<centers path> <result path> <num iterations>");
+               }
+               return true;
+       }
+       
+       private static DataSet<Point> getPointDataSet(ExecutionEnvironment env) 
{
+               if(fileOutput) {
+                       // read points from CSV file
+                       return env.readCsvFile(pointsPath)
+                                               .fieldDelimiter(" ")
+                                               .includeFields(true, true)
+                                               .types(Double.class, 
Double.class)
+                                               .map(new TuplePointConverter());
+               } else {
+                       return KMeansData.getDefaultPointDataSet(env);
+               }
+       }
+       
+       private static DataSet<Centroid> 
getCentroidDataSet(ExecutionEnvironment env) {
+               if(fileOutput) {
+                       return env.readCsvFile(centersPath)
+                                               .fieldDelimiter(" ")
+                                               .includeFields(true, true, true)
+                                               .types(Integer.class, 
Double.class, Double.class)
+                                               .map(new 
TupleCentroidConverter());
+               } else {
+                       return KMeansData.getDefaultCentroidDataSet(env);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java
 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java
new file mode 100644
index 0000000..e165612
--- /dev/null
+++ 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.clustering.util;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.examples.java.clustering.KMeans.Centroid;
+import org.apache.flink.examples.java.clustering.KMeans.Point;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the K-Means example program.
+ * The default data sets are used, if no parameters are given to the program.
+ *
+ */
+public class KMeansData {
+
+       // We have the data as object arrays so that we can also generate Scala 
Data Sources from it.
+       public static final Object[][] CENTROIDS = new Object[][] {
+               new Object[] {1, -31.85, -44.77},
+               new Object[]{2, 35.16, 17.46},
+               new Object[]{3, -5.16, 21.93},
+               new Object[]{4, -24.06, 6.81}
+       };
+
+       public static final Object[][] POINTS = new Object[][] {
+               new Object[] {-14.22, -48.01},
+               new Object[] {-22.78, 37.10},
+               new Object[] {56.18, -42.99},
+               new Object[] {35.04, 50.29},
+               new Object[] {-9.53, -46.26},
+               new Object[] {-34.35, 48.25},
+               new Object[] {55.82, -57.49},
+               new Object[] {21.03, 54.64},
+               new Object[] {-13.63, -42.26},
+               new Object[] {-36.57, 32.63},
+               new Object[] {50.65, -52.40},
+               new Object[] {24.48, 34.04},
+               new Object[] {-2.69, -36.02},
+               new Object[] {-38.80, 36.58},
+               new Object[] {24.00, -53.74},
+               new Object[] {32.41, 24.96},
+               new Object[] {-4.32, -56.92},
+               new Object[] {-22.68, 29.42},
+               new Object[] {59.02, -39.56},
+               new Object[] {24.47, 45.07},
+               new Object[] {5.23, -41.20},
+               new Object[] {-23.00, 38.15},
+               new Object[] {44.55, -51.50},
+               new Object[] {14.62, 59.06},
+               new Object[] {7.41, -56.05},
+               new Object[] {-26.63, 28.97},
+               new Object[] {47.37, -44.72},
+               new Object[] {29.07, 51.06},
+               new Object[] {0.59, -31.89},
+               new Object[] {-39.09, 20.78},
+               new Object[] {42.97, -48.98},
+               new Object[] {34.36, 49.08},
+               new Object[] {-21.91, -49.01},
+               new Object[] {-46.68, 46.04},
+               new Object[] {48.52, -43.67},
+               new Object[] {30.05, 49.25},
+               new Object[] {4.03, -43.56},
+               new Object[] {-37.85, 41.72},
+               new Object[] {38.24, -48.32},
+               new Object[] {20.83, 57.85}
+       };
+
+       public static DataSet<Centroid> 
getDefaultCentroidDataSet(ExecutionEnvironment env) {
+               List<Centroid> centroidList = new LinkedList<Centroid>();
+               for (Object[] centroid : CENTROIDS) {
+                       centroidList.add(
+                                       new Centroid((Integer) centroid[0], 
(Double) centroid[1], (Double) centroid[2]));
+               }
+               return env.fromCollection(centroidList);
+       }
+       
+       public static DataSet<Point> 
getDefaultPointDataSet(ExecutionEnvironment env) {
+               List<Point> pointList = new LinkedList<Point>();
+               for (Object[] point : POINTS) {
+                       pointList.add(new Point((Double) point[0], (Double) 
point[1]));
+               }
+               return env.fromCollection(pointList);
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
new file mode 100644
index 0000000..8f48d0a
--- /dev/null
+++ 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.examples.java.clustering.util;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.Locale;
+import java.util.Random;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.examples.java.clustering.KMeans;
+
+/**
+ * Generates data for the {@link KMeans} example program.
+ */
+public class KMeansDataGenerator {
+       
+       static {
+               Locale.setDefault(Locale.US);
+       }
+       
+       private static final String CENTERS_FILE = "centers";
+       private static final String POINTS_FILE = "points";
+       private static final long DEFAULT_SEED = 4650285087650871364L;
+       private static final double DEFAULT_VALUE_RANGE = 100.0;
+       private static final double RELATIVE_STDDEV = 0.08;
+       private static final int DIMENSIONALITY = 2;
+       private static final DecimalFormat FORMAT = new DecimalFormat("#0.00");
+       private static final char DELIMITER = ' ';
+
+       /**
+        * Main method to generate data for the {@link KMeans} example program.
+        * <p>
+        * The generator creates to files:
+        * <ul>
+        * <li><code>&lt; output-path &gt;/points</code> for the data points
+        * <li><code>&lt; output-path &gt;/centers</code> for the cluster 
centers
+        * </ul> 
+        * 
+        * @param args 
+        * <ol>
+        * <li>Int: Number of data points
+        * <li>Int: Number of cluster centers
+        * <li><b>Optional</b> String: Output path, default value is {tmp.dir}
+        * <li><b>Optional</b> Double: Standard deviation of data points
+        * <li><b>Optional</b> Double: Value range of cluster centers
+        * <li><b>Optional</b> Long: Random seed
+        * </ol>
+        *
+        * @throws IOException
+        */
+       public static void main(String[] args) throws IOException {
+
+               // check parameter count
+               if (args.length < 2) {
+                       System.out.println("KMeansDataGenerator -points <num> 
-k <num clusters> [-output <output-path>] [-stddev <relative stddev>] [-range 
<centroid range>] [-seed <seed>]");
+                       System.exit(1);
+               }
+
+               // parse parameters
+
+               final ParameterTool params = ParameterTool.fromArgs(args);
+               final int numDataPoints = params.getInt("points");
+               final int k = params.getInt("k");
+               final String outDir = params.get("output", 
System.getProperty("java.io.tmpdir"));
+               final double stddev = params.getDouble("stddev", 
RELATIVE_STDDEV);
+               final double range = params.getDouble("range", 
DEFAULT_VALUE_RANGE);
+               final long firstSeed = params.getLong("seed", DEFAULT_SEED);
+
+               
+               final double absoluteStdDev = stddev * range;
+               final Random random = new Random(firstSeed);
+               
+               // the means around which data points are distributed
+               final double[][] means = uniformRandomCenters(random, k, 
DIMENSIONALITY, range);
+               
+               // write the points out
+               BufferedWriter pointsOut = null;
+               try {
+                       pointsOut = new BufferedWriter(new FileWriter(new 
File(outDir+"/"+POINTS_FILE)));
+                       StringBuilder buffer = new StringBuilder();
+                       
+                       double[] point = new double[DIMENSIONALITY];
+                       int nextCentroid = 0;
+                       
+                       for (int i = 1; i <= numDataPoints; i++) {
+                               // generate a point for the current centroid
+                               double[] centroid = means[nextCentroid];
+                               for (int d = 0; d < DIMENSIONALITY; d++) {
+                                       point[d] = (random.nextGaussian() * 
absoluteStdDev) + centroid[d];
+                               }
+                               writePoint(point, buffer, pointsOut);
+                               nextCentroid = (nextCentroid + 1) % k;
+                       }
+               }
+               finally {
+                       if (pointsOut != null) {
+                               pointsOut.close();
+                       }
+               }
+               
+               // write the uniformly distributed centers to a file
+               BufferedWriter centersOut = null;
+               try {
+                       centersOut = new BufferedWriter(new FileWriter(new 
File(outDir+"/"+CENTERS_FILE)));
+                       StringBuilder buffer = new StringBuilder();
+                       
+                       double[][] centers = uniformRandomCenters(random, k, 
DIMENSIONALITY, range);
+                       
+                       for (int i = 0; i < k; i++) {
+                               writeCenter(i + 1, centers[i], buffer, 
centersOut);
+                       }
+               }
+               finally {
+                       if (centersOut != null) {
+                               centersOut.close();
+                       }
+               }
+               
+               System.out.println("Wrote "+numDataPoints+" data points to 
"+outDir+"/"+POINTS_FILE);
+               System.out.println("Wrote "+k+" cluster centers to 
"+outDir+"/"+CENTERS_FILE);
+       }
+       
+       private static double[][] uniformRandomCenters(Random rnd, int num, int 
dimensionality, double range) {
+               final double halfRange = range / 2;
+               final double[][] points = new double[num][dimensionality];
+               
+               for (int i = 0; i < num; i++) {
+                       for (int dim = 0; dim < dimensionality; dim ++) {
+                               points[i][dim] = (rnd.nextDouble() * range) - 
halfRange;
+                       }
+               }
+               return points;
+       }
+       
+       private static void writePoint(double[] coordinates, StringBuilder 
buffer, BufferedWriter out) throws IOException {
+               buffer.setLength(0);
+               
+               // write coordinates
+               for (int j = 0; j < coordinates.length; j++) {
+                       buffer.append(FORMAT.format(coordinates[j]));
+                       if(j < coordinates.length - 1) {
+                               buffer.append(DELIMITER);
+                       }
+               }
+               
+               out.write(buffer.toString());
+               out.newLine();
+       }
+       
+       private static void writeCenter(long id, double[] coordinates, 
StringBuilder buffer, BufferedWriter out) throws IOException {
+               buffer.setLength(0);
+               
+               // write id
+               buffer.append(id);
+               buffer.append(DELIMITER);
+
+               // write coordinates
+               for (int j = 0; j < coordinates.length; j++) {
+                       buffer.append(FORMAT.format(coordinates[j]));
+                       if(j < coordinates.length - 1) {
+                               buffer.append(DELIMITER);
+                       }
+               }
+               
+               out.write(buffer.toString());
+               out.newLine();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
new file mode 100644
index 0000000..8e87892
--- /dev/null
+++ 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.distcp;
+
+import org.apache.commons.io.IOUtils;
+
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.operators.FlatMapOperator;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Collector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A main class of the Flink distcp utility.
+ * It's a simple reimplementation of Hadoop distcp
+ * (see <a 
href="http://hadoop.apache.org/docs/r1.2.1/distcp.html";>http://hadoop.apache.org/docs/r1.2.1/distcp.html</a>)
+ * with a dynamic input format
+ * Note that this tool does not deal with retriability. Additionally, empty 
directories are not copied over.
+ * <p>
+ * When running locally, local file systems paths can be used.
+ * However, in a distributed environment HDFS paths must be provided both as 
input and output.
+ */
+public class DistCp {
+       
+       private static final Logger LOGGER = 
LoggerFactory.getLogger(DistCp.class);
+       public static final String BYTES_COPIED_CNT_NAME = "BYTES_COPIED";
+       public static final String FILES_COPIED_CNT_NAME = "FILES_COPIED";
+
+       public static void main(String[] args) throws Exception {
+               if (args.length != 3) {
+                       printHelp();
+                       return;
+               }
+
+               final Path sourcePath = new Path(args[0]);
+               final Path targetPath = new Path(args[1]);
+               int parallelism = Integer.valueOf(args[2], 10);
+
+               // set up the execution environment
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               checkInputParams(env, sourcePath, targetPath, parallelism);
+               env.setParallelism(parallelism);
+
+               long startTime = System.currentTimeMillis();
+               LOGGER.info("Initializing copy tasks");
+               List<FileCopyTask> tasks = getCopyTasks(sourcePath);
+               LOGGER.info("Copy task initialization took " + 
(System.currentTimeMillis() - startTime) + "ms");
+
+               DataSet<FileCopyTask> inputTasks = new DataSource<>(env,
+                               new FileCopyTaskInputFormat(tasks),
+                               new GenericTypeInfo<>(FileCopyTask.class), 
"fileCopyTasks");
+
+
+               FlatMapOperator<FileCopyTask, Object> res = 
inputTasks.flatMap(new RichFlatMapFunction<FileCopyTask, Object>() {
+                       
+                       private LongCounter fileCounter;
+                       private LongCounter bytesCounter;
+
+                       @Override
+                       public void open(Configuration parameters) throws 
Exception {
+                               bytesCounter = 
getRuntimeContext().getLongCounter(BYTES_COPIED_CNT_NAME);
+                               fileCounter = 
getRuntimeContext().getLongCounter(FILES_COPIED_CNT_NAME);
+                       }
+
+                       @Override
+                       public void flatMap(FileCopyTask task, 
Collector<Object> out) throws Exception {
+                               LOGGER.info("Processing task: " + task);
+                               Path outPath = new Path(targetPath, 
task.getRelativePath());
+
+                               FileSystem targetFs = 
targetPath.getFileSystem();
+                               // creating parent folders in case of a local FS
+                               if (!targetFs.isDistributedFS()) {
+                                       //dealing with cases like file:///tmp 
or just /tmp
+                                       File outFile = 
outPath.toUri().isAbsolute() ? new File(outPath.toUri()) : new 
File(outPath.toString());
+                                       File parentFile = 
outFile.getParentFile();
+                                       if (!parentFile.mkdirs() && 
!parentFile.exists()) {
+                                               throw new 
RuntimeException("Cannot create local file system directories: " + parentFile);
+                                       }
+                               }
+                               FSDataOutputStream outputStream = null;
+                               FSDataInputStream inputStream = null;
+                               try {
+                                       outputStream = targetFs.create(outPath, 
true);
+                                       inputStream = 
task.getPath().getFileSystem().open(task.getPath());
+                                       int bytes = IOUtils.copy(inputStream, 
outputStream);
+                                       bytesCounter.add(bytes);
+                               } finally {
+                                       IOUtils.closeQuietly(inputStream);
+                                       IOUtils.closeQuietly(outputStream);
+                               }
+                               fileCounter.add(1l);
+                       }
+               });
+
+               // no data sinks are needed, therefore just printing an empty 
result
+               res.print();
+
+               Map<String, Object> accumulators = 
env.getLastJobExecutionResult().getAllAccumulatorResults();
+               LOGGER.info("== COUNTERS ==");
+               for (Map.Entry<String, Object> e : accumulators.entrySet()) {
+                       LOGGER.info(e.getKey() + ": " + e.getValue());
+               }
+       }
+
+
+       // 
-----------------------------------------------------------------------------------------
+       // HELPER METHODS
+       // 
-----------------------------------------------------------------------------------------
+
+       private static void checkInputParams(ExecutionEnvironment env, Path 
sourcePath, Path targetPath, int parallelism) throws IOException {
+               if (parallelism <= 0) {
+                       throw new IllegalArgumentException("Parallelism should 
be greater than 0");
+               }
+
+               boolean isLocal = env instanceof LocalEnvironment;
+               if (!isLocal &&
+                               !(sourcePath.getFileSystem().isDistributedFS() 
&& targetPath.getFileSystem().isDistributedFS())) {
+                       throw new IllegalArgumentException("In a distributed 
mode only HDFS input/output paths are supported");
+               }
+       }
+
+       private static void printHelp() {
+               System.err.println("Usage: <input_path> <output_path> 
<level_of_parallelism>");
+       }
+
+       private static List<FileCopyTask> getCopyTasks(Path sourcePath) throws 
IOException {
+               List<FileCopyTask> tasks = new ArrayList<>();
+               getCopyTasks(sourcePath, "", tasks);
+               return tasks;
+       }
+
+       private static void getCopyTasks(Path p, String rel, List<FileCopyTask> 
tasks) throws IOException {
+               FileStatus[] res = p.getFileSystem().listStatus(p);
+               if (res == null) {
+                       return;
+               }
+               for (FileStatus fs : res) {
+                       if (fs.isDir()) {
+                               getCopyTasks(fs.getPath(), rel + 
fs.getPath().getName() + "/", tasks);
+                       } else {
+                               Path cp = fs.getPath();
+                               tasks.add(new FileCopyTask(cp, rel + 
cp.getName()));
+                       }
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
new file mode 100644
index 0000000..7f38a8b
--- /dev/null
+++ 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.distcp;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.core.fs.Path;
+
+import java.io.Serializable;
+
+/**
+ * A Java POJO that represents a task for copying a single file
+ */
+public class FileCopyTask implements Serializable {
+       
+       private static final long serialVersionUID = -8760082278978316032L;
+       
+       private final Path path;
+       private final String relativePath;
+
+       public FileCopyTask(Path path, String relativePath) {
+               if (StringUtils.isEmpty(relativePath)) {
+                       throw new IllegalArgumentException("Relative path 
should not be empty for: " + path);
+               }
+               this.path = path;
+               this.relativePath = relativePath;
+       }
+
+       public Path getPath() {
+               return path;
+       }
+
+       public String getRelativePath() {
+               return relativePath;
+       }
+
+       @Override
+       public String toString() {
+               return "FileCopyTask{" +
+                               "path=" + path +
+                               ", relativePath='" + relativePath + '\'' +
+                               '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
new file mode 100644
index 0000000..d6e6713
--- /dev/null
+++ 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.distcp;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * An implementation of an input format that dynamically assigns {@code 
FileCopyTask} to the mappers
+ * that have finished previously assigned tasks
+ */
+public class FileCopyTaskInputFormat implements InputFormat<FileCopyTask, 
FileCopyTaskInputSplit> {
+
+       private static final long serialVersionUID = -644394866425221151L;
+       
+       private static final Logger LOGGER = 
LoggerFactory.getLogger(FileCopyTaskInputFormat.class);
+       
+
+       private final List<FileCopyTask> tasks;
+
+       public FileCopyTaskInputFormat(List<FileCopyTask> tasks) {
+               this.tasks = tasks;
+       }
+
+       private class FileCopyTaskAssigner implements InputSplitAssigner {
+               private Queue<FileCopyTaskInputSplit> splits;
+
+               public FileCopyTaskAssigner(FileCopyTaskInputSplit[] 
inputSplits) {
+                       splits = new LinkedList<>(Arrays.asList(inputSplits));
+               }
+
+               @Override
+               public InputSplit getNextInputSplit(String host, int taskId) {
+                       LOGGER.info("Getting copy task for task: " + taskId);
+                       return splits.poll();
+               }
+       }
+
+       @Override
+       public void configure(Configuration parameters) {
+               //no op
+       }
+
+       @Override
+       public BaseStatistics getStatistics(BaseStatistics cachedStatistics) 
throws IOException {
+               return null;
+       }
+
+       @Override
+       public FileCopyTaskInputSplit[] createInputSplits(int minNumSplits) 
throws IOException {
+               FileCopyTaskInputSplit[] splits = new 
FileCopyTaskInputSplit[tasks.size()];
+               int i = 0;
+               for (FileCopyTask t : tasks) {
+                       splits[i] = new FileCopyTaskInputSplit(t, i);
+                       i++;
+               }
+               return splits;
+       }
+
+       @Override
+       public InputSplitAssigner 
getInputSplitAssigner(FileCopyTaskInputSplit[] inputSplits) {
+               return new FileCopyTaskAssigner(inputSplits);
+       }
+
+       private FileCopyTaskInputSplit curInputSplit = null;
+
+       @Override
+       public void open(FileCopyTaskInputSplit split) throws IOException {
+               curInputSplit = split;
+       }
+
+       @Override
+       public boolean reachedEnd() throws IOException {
+               return curInputSplit == null;
+       }
+
+       @Override
+       public FileCopyTask nextRecord(FileCopyTask reuse) throws IOException {
+               FileCopyTask toReturn = curInputSplit.getTask();
+               curInputSplit = null;
+               return toReturn;
+       }
+
+       @Override
+       public void close() throws IOException {
+               //no op
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
new file mode 100644
index 0000000..33943b6
--- /dev/null
+++ 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.distcp;
+
+import org.apache.flink.core.io.InputSplit;
+
+/**
+ * Implementation of {@code InputSplit} for copying files
+ */
+public class FileCopyTaskInputSplit implements InputSplit {
+       
+       private static final long serialVersionUID = -7621656017747660450L;
+       
+       private final FileCopyTask task;
+       private final int splitNumber;
+
+       public FileCopyTaskInputSplit(FileCopyTask task, int splitNumber) {
+               this.task = task;
+               this.splitNumber = splitNumber;
+       }
+
+       public FileCopyTask getTask() {
+               return task;
+       }
+
+       @Override
+       public int getSplitNumber() {
+               return splitNumber;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
new file mode 100644
index 0000000..ce36504
--- /dev/null
+++ 
b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.examples.java.graph;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
+import org.apache.flink.util.Collector;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * An implementation of the connected components algorithm, using a delta 
iteration.
+ * 
+ * <p>
+ * Initially, the algorithm assigns each vertex an unique ID. In each step, a 
vertex picks the minimum of its own ID and its
+ * neighbors' IDs, as its new ID and tells its neighbors about its new ID. 
After the algorithm has completed, all vertices in the
+ * same component will have the same ID.
+ * 
+ * <p>
+ * A vertex whose component ID did not change needs not propagate its 
information in the next step. Because of that,
+ * the algorithm is easily expressible via a delta iteration. We here model 
the solution set as the vertices with
+ * their current component ids, and the workset as the changed vertices. 
Because we see all vertices initially as
+ * changed, the initial workset and the initial solution set are identical. 
Also, the delta to the solution set
+ * is consequently also the next workset.<br>
+ * 
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * <li>Vertices represented as IDs and separated by new-line characters.<br> 
+ * For example <code>"1\n2\n12\n42\n63"</code> gives five vertices (1), (2), 
(12), (42), and (63).
+ * <li>Edges are represented as pairs for vertex IDs which are separated by 
space 
+ * characters. Edges are separated by new-line characters.<br>
+ * For example <code>"1 2\n2 12\n1 12\n42 63"</code> gives four (undirected) 
edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63).
+ * </ul>
+ * 
+ * <p>
+ * Usage: <code>ConnectedComponents &lt;vertices path&gt; &lt;edges path&gt; 
&lt;result path&gt; &lt;max number of iterations&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from 
{@link org.apache.flink.examples.java.graph.util.ConnectedComponentsData} and 
10 iterations. 
+ * 
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>Delta Iterations
+ * <li>Generic-typed Functions 
+ * </ul>
+ */
+@SuppressWarnings("serial")
+public class ConnectedComponents implements ProgramDescription {
+       
+       // 
*************************************************************************
+       //     PROGRAM
+       // 
*************************************************************************
+       
+       public static void main(String... args) throws Exception {
+               
+               if(!parseParameters(args)) {
+                       return;
+               }
+               
+               // set up execution environment
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               
+               // read vertex and edge data
+               DataSet<Long> vertices = getVertexDataSet(env);
+               DataSet<Tuple2<Long, Long>> edges = 
getEdgeDataSet(env).flatMap(new UndirectEdge());
+               
+               // assign the initial components (equal to the vertex id)
+               DataSet<Tuple2<Long, Long>> verticesWithInitialId = 
vertices.map(new DuplicateValue<Long>());
+                               
+               // open a delta iteration
+               DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> 
iteration =
+                               
verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);
+               
+               // apply the step logic: join with the edges, select the 
minimum neighbor, update if the component of the candidate is smaller
+               DataSet<Tuple2<Long, Long>> changes = 
iteration.getWorkset().join(edges).where(0).equalTo(0).with(new 
NeighborWithComponentIDJoin())
+                               .groupBy(0).aggregate(Aggregations.MIN, 1)
+                               
.join(iteration.getSolutionSet()).where(0).equalTo(0)
+                               .with(new ComponentIdFilter());
+
+               // close the delta iteration (delta and new workset are 
identical)
+               DataSet<Tuple2<Long, Long>> result = 
iteration.closeWith(changes, changes);
+               
+               // emit result
+               if (fileOutput) {
+                       result.writeAsCsv(outputPath, "\n", " ");
+                       // execute program
+                       env.execute("Connected Components Example");
+               } else {
+                       result.print();
+               }
+       }
+       
+       // 
*************************************************************************
+       //     USER FUNCTIONS
+       // 
*************************************************************************
+       
+       /**
+        * Function that turns a value into a 2-tuple where both fields are 
that value.
+        */
+       @ForwardedFields("*->f0")
+       public static final class DuplicateValue<T> implements MapFunction<T, 
Tuple2<T, T>> {
+               
+               @Override
+               public Tuple2<T, T> map(T vertex) {
+                       return new Tuple2<T, T>(vertex, vertex);
+               }
+       }
+       
+       /**
+        * Undirected edges by emitting for each input edge the input edges 
itself and an inverted version.
+        */
+       public static final class UndirectEdge implements 
FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+               Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
+               
+               @Override
+               public void flatMap(Tuple2<Long, Long> edge, 
Collector<Tuple2<Long, Long>> out) {
+                       invertedEdge.f0 = edge.f1;
+                       invertedEdge.f1 = edge.f0;
+                       out.collect(edge);
+                       out.collect(invertedEdge);
+               }
+       }
+       
+       /**
+        * UDF that joins a (Vertex-ID, Component-ID) pair that represents the 
current component that
+        * a vertex is associated with, with a (Source-Vertex-ID, 
Target-VertexID) edge. The function
+        * produces a (Target-vertex-ID, Component-ID) pair.
+        */
+       @ForwardedFieldsFirst("f1->f1")
+       @ForwardedFieldsSecond("f1->f0")
+       public static final class NeighborWithComponentIDJoin implements 
JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+               @Override
+               public Tuple2<Long, Long> join(Tuple2<Long, Long> 
vertexWithComponent, Tuple2<Long, Long> edge) {
+                       return new Tuple2<Long, Long>(edge.f1, 
vertexWithComponent.f1);
+               }
+       }
+       
+
+
+       @ForwardedFieldsFirst("*")
+       public static final class ComponentIdFilter implements 
FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+               @Override
+               public void join(Tuple2<Long, Long> candidate, Tuple2<Long, 
Long> old, Collector<Tuple2<Long, Long>> out) {
+                       if (candidate.f1 < old.f1) {
+                               out.collect(candidate);
+                       }
+               }
+       }
+
+
+
+       @Override
+       public String getDescription() {
+               return "Parameters: <vertices-path> <edges-path> <result-path> 
<max-number-of-iterations>";
+       }
+       
+       // 
*************************************************************************
+       //     UTIL METHODS
+       // 
*************************************************************************
+       
+       private static boolean fileOutput = false;
+       private static String verticesPath = null;
+       private static String edgesPath = null;
+       private static String outputPath = null;
+       private static int maxIterations = 10;
+       
+       private static boolean parseParameters(String[] programArguments) {
+               
+               if(programArguments.length > 0) {
+                       // parse input arguments
+                       fileOutput = true;
+                       if(programArguments.length == 4) {
+                               verticesPath = programArguments[0];
+                               edgesPath = programArguments[1];
+                               outputPath = programArguments[2];
+                               maxIterations = 
Integer.parseInt(programArguments[3]);
+                       } else {
+                               System.err.println("Usage: ConnectedComponents 
<vertices path> <edges path> <result path> <max number of iterations>");
+                               return false;
+                       }
+               } else {
+                       System.out.println("Executing Connected Components 
example with default parameters and built-in default data.");
+                       System.out.println("  Provide parameters to read input 
data from files.");
+                       System.out.println("  See the documentation for the 
correct format of input files.");
+                       System.out.println("  Usage: ConnectedComponents 
<vertices path> <edges path> <result path> <max number of iterations>");
+               }
+               return true;
+       }
+       
+       private static DataSet<Long> getVertexDataSet(ExecutionEnvironment env) 
{
+               
+               if(fileOutput) {
+                       return env.readCsvFile(verticesPath).types(Long.class)
+                                               .map(
+                                                               new 
MapFunction<Tuple1<Long>, Long>() {
+                                                                       public 
Long map(Tuple1<Long> value) { return value.f0; }
+                                                               });
+               } else {
+                       return 
ConnectedComponentsData.getDefaultVertexDataSet(env);
+               }
+       }
+       
+       private static DataSet<Tuple2<Long, Long>> 
getEdgeDataSet(ExecutionEnvironment env) {
+               
+               if(fileOutput) {
+                       return env.readCsvFile(edgesPath).fieldDelimiter(" 
").types(Long.class, Long.class);
+               } else {
+                       return 
ConnectedComponentsData.getDefaultEdgeDataSet(env);
+               }
+       }
+       
+       
+}

Reply via email to