http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml 
b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
deleted file mode 100644
index 3661726..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
+++ /dev/null
@@ -1,518 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-streaming-parent</artifactId>
-               <version>0.9-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-streaming-examples</artifactId>
-       <name>flink-streaming-examples</name>
-
-       <packaging>jar</packaging>
-
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-core</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-scala</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-java-examples</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-connectors</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-       </dependencies>
-
-       <build>
-               <plugins>
-                       <!-- get default data from flink-java-examples package 
-->
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-dependency-plugin</artifactId>
-                               <version>2.9</version><!--$NO-MVN-MAN-VER$-->
-                               <executions>
-                                       <execution>
-                                               <id>unpack</id>
-                                               <phase>prepare-package</phase>
-                                               <goals>
-                                                       <goal>unpack</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <artifactItems>
-                                                               <artifactItem>
-                                                                       
<groupId>org.apache.flink</groupId>
-                                                                       
<artifactId>flink-java-examples</artifactId>
-                                                                       
<version>${project.version}</version>
-                                                                       
<type>jar</type>
-                                                                       
<overWrite>false</overWrite>
-                                                                       
<outputDirectory>${project.build.directory}/classes</outputDirectory>
-                                                                       
<includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes>
-                                                               </artifactItem>
-                                                       </artifactItems>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-
-                       <!-- self-contained jars for each example -->
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-jar-plugin</artifactId>
-                               
-                               <executions>
-
-                                       <!-- Iteration -->
-                                       <execution>
-                                               <id>Iteration</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       
<classifier>Iteration</classifier>
-
-                                                       <archive>
-                                                               
<manifestEntries>
-                                                                       
<program-class>org.apache.flink.streaming.examples.iteration.IterateExample</program-class>
-                                                               
</manifestEntries>
-                                                       </archive>
-
-                                                       <includes>
-                                                               
<include>org/apache/flink/streaming/examples/iteration/*.class</include>        
                
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-
-                                       <!-- IncrementalLearning -->
-                                       <execution>
-                                               <id>IncrementalLearning</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       
<classifier>IncrementalLearning</classifier>
-
-                                                       <archive>
-                                                               
<manifestEntries>
-                                                                       
<program-class>org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton</program-class>
-                                                               
</manifestEntries>
-                                                       </archive>
-
-                                                       <includes>
-                                                               
<include>org/apache/flink/streaming/examples/ml/*.class</include>               
        
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-
-                                       <!-- Twitter -->
-                                       <execution>
-                                               <id>Twitter</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       
<classifier>Twitter</classifier>
-
-                                                       <archive>
-                                                               
<manifestEntries>
-                                                                       
<program-class>org.apache.flink.streaming.examples.twitter.TwitterStream</program-class>
-                                                               
</manifestEntries>
-                                                       </archive>
-
-                                                       <includes>
-                                                               
<include>org/apache/flink/streaming/examples/twitter/*.class</include>
-                                                               
<include>org/apache/flink/streaming/examples/twitter/util/*.class</include>     
                                        
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-
-                                       <!-- WindowJoin -->
-                                       <execution>
-                                               <id>WindowJoin</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       
<classifier>WindowJoin</classifier>
-
-                                                       <archive>
-                                                               
<manifestEntries>
-                                                                       
<program-class>org.apache.flink.streaming.examples.join.WindowJoin</program-class>
-                                                               
</manifestEntries>
-                                                       </archive>
-
-                                                       <includes>
-                                                               
<include>org/apache/flink/streaming/examples/join/*.class</include>             
        
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-
-                                       <!-- WordCountPOJO -->
-                                       <execution>
-                                               <id>WordCountPOJO</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       
<classifier>WordCountPOJO</classifier>
-
-                                                       <archive>
-                                                               
<manifestEntries>
-                                                                       
<program-class>org.apache.flink.streaming.examples.wordcount.PojoExample</program-class>
-                                                               
</manifestEntries>
-                                                       </archive>
-
-                                                       <includes>
-                                                               
<include>org/apache/flink/streaming/examples/wordcount/PojoExample.class</include>
-                                                               
<include>org/apache/flink/streaming/examples/wordcount/PojoExample$*.class</include>
-                                                               
<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
                    
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-
-                                       <!-- WordCount -->
-                                       <execution>
-                                               <id>WordCount</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       
<classifier>WordCount</classifier>
-
-                                                       <archive>
-                                                               
<manifestEntries>
-                                                                       
<program-class>org.apache.flink.streaming.examples.wordcount.WordCount</program-class>
-                                                               
</manifestEntries>
-                                                       </archive>
-
-                                                       <includes>
-                                                               
<include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
-                                                               
<include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
-                                                               
<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
                            
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-
-                                       <!-- SocketTextStreamWordCount -->
-                                       <execution>
-                                               
<id>SocketTextStreamWordCount</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       
<classifier>SocketTextStreamWordCount</classifier>
-
-                                                       <archive>
-                                                               
<manifestEntries>
-                                                                       
<program-class>org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount</program-class>
-                                                               
</manifestEntries>
-                                                       </archive>
-
-                                                       <includes>
-                                                               
<include>org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.class</include>
-                                                               
<include>org/apache/flink/streaming/examples/wordcount/WordCount$Tokenizer.class</include>
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-
-                                       <!-- DeltaExract -->
-                                       <execution>
-                                               <id>DeltaExract</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       
<classifier>DeltaExract</classifier>
-
-                                                       <archive>
-                                                               
<manifestEntries>
-                                                                       
<program-class>org.apache.flink.streaming.examples.windowing.DeltaExtractExample</program-class>
-                                                               
</manifestEntries>
-                                                       </archive>
-
-                                                       <includes>
-                                                               
<include>org/apache/flink/streaming/examples/windowing/DeltaExtractExample.class</include>
-                                                               
<include>org/apache/flink/streaming/examples/windowing/DeltaExtractExample$*.class</include>
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-
-                                       <!-- MultiplePolicies -->
-                                       <execution>
-                                               <id>MultiplePolicies</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       
<classifier>MultiplePolicies</classifier>
-
-                                                       <archive>
-                                                               
<manifestEntries>
-                                                                       
<program-class>org.apache.flink.streaming.examples.windowing.MultiplePoliciesExample</program-class>
-                                                               
</manifestEntries>
-                                                       </archive>
-
-                                                       <includes>
-                                                               
<include>org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.class</include>
-                                                               
<include>org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample$*.class</include>
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-
-                                       <!-- SlidingExample -->
-                                       <execution>
-                                               <id>SlidingExample</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       
<classifier>SlidingExample</classifier>
-
-                                                       <archive>
-                                                               
<manifestEntries>
-                                                                       
<program-class>org.apache.flink.streaming.examples.windowing.SlidingExample</program-class>
-                                                               
</manifestEntries>
-                                                       </archive>
-
-                                                       <includes>
-                                                               
<include>org/apache/flink/streaming/examples/windowing/SlidingExample.class</include>
-                                                               
<include>org/apache/flink/streaming/examples/windowing/SlidingExample$*.class</include>
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-
-                                       <!-- TimeWindowing -->
-                                       <execution>
-                                               <id>TimeWindowing</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       
<classifier>TimeWindowing</classifier>
-
-                                                       <archive>
-                                                               
<manifestEntries>
-                                                                       
<program-class>org.apache.flink.streaming.examples.windowing.TimeWindowingExample</program-class>
-                                                               
</manifestEntries>
-                                                       </archive>
-
-                                                       <includes>
-                                                               
<include>org/apache/flink/streaming/examples/windowing/TimeWindowingExample.class</include>
-                                                               
<include>org/apache/flink/streaming/examples/windowing/TimeWindowingExample$*.class</include>
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-
-<!-- Scala Compiler -->
-                       <plugin>
-                               <groupId>net.alchim31.maven</groupId>
-                               <artifactId>scala-maven-plugin</artifactId>
-                               <version>3.1.4</version>
-                               <executions>
-                                       <!-- Run scala compiler in the 
process-resources phase, so that dependencies on
-                                               scala classes can be resolved 
later in the (Java) compile phase -->
-                                       <execution>
-                                               <id>scala-compile-first</id>
-                                               <phase>process-resources</phase>
-                                               <goals>
-                                                       <goal>compile</goal>
-                                               </goals>
-                                       </execution>
- 
-                                       <!-- Run scala compiler in the 
process-test-resources phase, so that dependencies on
-                                                scala classes can be resolved 
later in the (Java) test-compile phase -->
-                                       <execution>
-                                               <id>scala-test-compile</id>
-                                               
<phase>process-test-resources</phase>
-                                               <goals>
-                                                       <goal>testCompile</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                               <configuration>
-                                       <jvmArgs>
-                                               <jvmArg>-Xms128m</jvmArg>
-                                               <jvmArg>-Xmx512m</jvmArg>
-                                       </jvmArgs>
-                                       <compilerPlugins>
-                                          <compilerPlugin>
-                                                  
<groupId>org.scalamacros</groupId>
-                                                  
<artifactId>paradise_${scala.version}</artifactId>
-                                                  
<version>${scala.macros.version}</version>
-                                          </compilerPlugin>
-                                  </compilerPlugins>
-                               </configuration>
-                       </plugin>
-                       
-                       <!-- Eclipse Integration -->
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-eclipse-plugin</artifactId>
-                               <version>2.8</version>
-                               <configuration>
-                                       <downloadSources>true</downloadSources>
-                                       <projectnatures>
-                                               
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
-                                               
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
-                                       </projectnatures>
-                                       <buildcommands>
-                                               
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
-                                       </buildcommands>
-                                       <classpathContainers>
-                                               
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
-                                               
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-                                       </classpathContainers>
-                                       <excludes>
-                                               
<exclude>org.scala-lang:scala-library</exclude>
-                                               
<exclude>org.scala-lang:scala-compiler</exclude>
-                                       </excludes>
-                                       <sourceIncludes>
-                                               
<sourceInclude>**/*.scala</sourceInclude>
-                                               
<sourceInclude>**/*.java</sourceInclude>
-                                       </sourceIncludes>
-                               </configuration>
-                       </plugin>
-
-                       <!-- Adding scala source directories to build path -->
-                       <plugin>
-                               <groupId>org.codehaus.mojo</groupId>
-                               
<artifactId>build-helper-maven-plugin</artifactId>
-                               <version>1.7</version>
-                               <executions>
-                                       <!-- Add src/main/scala to eclipse 
build path -->
-                                       <execution>
-                                               <id>add-source</id>
-                                               <phase>generate-sources</phase>
-                                               <goals>
-                                                       <goal>add-source</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <sources>
-                                                               
<source>src/main/scala</source>
-                                                       </sources>
-                                               </configuration>
-                                       </execution>
-                                       <!-- Add src/test/scala to eclipse 
build path -->
-                                       <execution>
-                                               <id>add-test-source</id>
-                                               
<phase>generate-test-sources</phase>
-                                               <goals>
-                                                       
<goal>add-test-source</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <sources>
-                                                               
<source>src/test/scala</source>
-                                                       </sources>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-
-                       <plugin>
-                               <groupId>org.scalastyle</groupId>
-                               <artifactId>scalastyle-maven-plugin</artifactId>
-                               <version>0.5.0</version>
-                               <executions>
-                                       <execution>
-                                               <goals>
-                                                       <goal>check</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                               <configuration>
-                                       <verbose>false</verbose>
-                                       <failOnViolation>true</failOnViolation>
-                                       
<includeTestSourceDirectory>true</includeTestSourceDirectory>
-                                       <failOnWarning>false</failOnWarning>
-                                       
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
-                                       
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
-                                       
<configLocation>${project.basedir}/../../../tools/maven/scalastyle-config.xml</configLocation>
-                                       
<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
-                                       <outputEncoding>UTF-8</outputEncoding>
-                               </configuration>
-                       </plugin>
-
-               </plugins>
-               
-               <pluginManagement>
-                       <plugins>
-                               <!--This plugin's configuration is used to 
store Eclipse m2e settings only. It has no influence on the Maven build 
itself.-->
-                               <plugin>
-                                       <groupId>org.eclipse.m2e</groupId>
-                                       
<artifactId>lifecycle-mapping</artifactId>
-                                       <version>1.0.0</version>
-                                       <configuration>
-                                               <lifecycleMappingMetadata>
-                                                       <pluginExecutions>
-                                                               
<pluginExecution>
-                                                                       
<pluginExecutionFilter>
-                                                                               
<groupId>org.apache.maven.plugins</groupId>
-                                                                               
<artifactId>maven-dependency-plugin</artifactId>
-                                                                               
<versionRange>[2.9,)</versionRange>
-                                                                               
<goals>
-                                                                               
        <goal>unpack</goal>
-                                                                               
</goals>
-                                                                       
</pluginExecutionFilter>
-                                                                       <action>
-                                                                               
<ignore/>
-                                                                       
</action>
-                                                               
</pluginExecution>
-                                                       </pluginExecutions>
-                                               </lifecycleMappingMetadata>
-                                       </configuration>
-                               </plugin>
-                       </plugins>
-               </pluginManagement>
-               
-       </build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
deleted file mode 100644
index 998e818..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.iteration;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.IterativeDataStream;
-import org.apache.flink.streaming.api.datastream.SplitDataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * Example illustrating iterations in Flink streaming.
- * 
- * <p>
- * The program sums up random numbers and counts additions it performs to reach
- * a specific threshold in an iterative streaming fashion.
- * </p>
- * 
- * <p>
- * This example shows how to use:
- * <ul>
- * <li>streaming iterations,
- * <li>buffer timeout to enhance latency,
- * <li>directed outputs.
- * </ul>
- */
-public class IterateExample {
-
-       // 
*************************************************************************
-       // PROGRAM
-       // 
*************************************************************************
-
-       public static void main(String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               // set up input for the stream of (0,0) pairs
-               List<Tuple2<Double, Integer>> input = new 
ArrayList<Tuple2<Double, Integer>>();
-               for (int i = 0; i < 1000; i++) {
-                       input.add(new Tuple2<Double, Integer>(0., 0));
-               }
-
-               // obtain execution environment and set setBufferTimeout(0) to 
enable
-               // continuous flushing of the output buffers (lowest latency)
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment()
-                               .setBufferTimeout(1);
-
-               // create an iterative data stream from the input with 5 second 
timeout
-               IterativeDataStream<Tuple2<Double, Integer>> it = 
env.fromCollection(input).shuffle()
-                               .iterate(5000);
-
-               // apply the step function to add new random value to the tuple 
and to
-               // increment the counter and split the output with the output 
selector
-               SplitDataStream<Tuple2<Double, Integer>> step = it.map(new 
Step()).split(new MySelector());
-
-               // close the iteration by selecting the tuples that were 
directed to the
-               // 'iterate' channel in the output selector
-               it.closeWith(step.select("iterate"));
-
-               // to produce the final output select the tuples directed to the
-               // 'output' channel then project it to the desired second field
-
-               DataStream<Tuple1<Integer>> numbers = 
step.select("output").project(1).types(Integer.class);
-
-               // emit result
-               if (fileOutput) {
-                       numbers.writeAsText(outputPath, 1);
-               } else {
-                       numbers.print();
-               }
-
-               // execute the program
-               env.execute("Streaming Iteration Example");
-       }
-
-       // 
*************************************************************************
-       // USER FUNCTIONS
-       // 
*************************************************************************
-
-       /**
-        * Iteration step function which takes an input (Double , Integer) and
-        * produces an output (Double + random, Integer + 1).
-        */
-       public static class Step extends
-                       RichMapFunction<Tuple2<Double, Integer>, Tuple2<Double, 
Integer>> {
-               private static final long serialVersionUID = 1L;
-               private transient Random rnd;
-
-               public void open(Configuration parameters) {
-                       rnd = new Random();
-               }
-
-               @Override
-               public Tuple2<Double, Integer> map(Tuple2<Double, Integer> 
value) throws Exception {
-                       return new Tuple2<Double, Integer>(value.f0 + 
rnd.nextDouble(), value.f1 + 1);
-               }
-       }
-
-       /**
-        * OutputSelector testing which tuple needs to be iterated again.
-        */
-       public static class MySelector implements OutputSelector<Tuple2<Double, 
Integer>> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public Iterable<String> select(Tuple2<Double, Integer> value) {
-                       List<String> output = new ArrayList<String>();
-                       if (value.f0 > 100) {
-                               output.add("output");
-                       } else {
-                               output.add("iterate");
-                       }
-                       return output;
-               }
-
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String outputPath;
-
-       private static boolean parseParameters(String[] args) {
-
-               if (args.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if (args.length == 1) {
-                               outputPath = args[0];
-                       } else {
-                               System.err.println("Usage: IterateExample 
<result path>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing IterateExample with 
generated data.");
-                       System.out.println("  Provide parameter to write to 
file.");
-                       System.out.println("  Usage: IterateExample <result 
path>");
-               }
-               return true;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
deleted file mode 100644
index dcfed50..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.join;
-
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.RichSourceFunction;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-/**
- * Example illustrating join over sliding windows of streams in Flink.
- * 
- * <p>
- * his example will join two streams with a sliding window. One which emits
- * grades and one which emits salaries of people.
- * </p>
- * 
- * <p>
- * This example shows how to:
- * <ul>
- * <li>do windowed joins,
- * <li>use tuple data types,
- * <li>write a simple streaming program.
- */
-public class WindowJoin {
-
-       // 
*************************************************************************
-       // PROGRAM
-       // 
*************************************************************************
-
-       public static void main(String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               // obtain execution environment
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               // connect to the data sources for grades and salaries
-               DataStream<Tuple2<String, Integer>> grades = env.addSource(new 
GradeSource());
-               DataStream<Tuple2<String, Integer>> salaries = 
env.addSource(new SalarySource());
-
-               // apply a temporal join over the two stream based on the names 
over one
-               // second windows
-               DataStream<Tuple3<String, Integer, Integer>> joinedStream = 
grades
-                                               .join(salaries)
-                                               .onWindow(1, TimeUnit.SECONDS)
-                                               .where(0)
-                                               .equalTo(0)
-                                               .with(new MyJoinFunction());
-               
-               // emit result
-               if (fileOutput) {
-                       joinedStream.writeAsText(outputPath, 1);
-               } else {
-                       joinedStream.print();
-               }
-
-               // execute program
-               env.execute("Windowed Join Example");
-       }
-
-       // 
*************************************************************************
-       // USER FUNCTIONS
-       // 
*************************************************************************
-
-       private final static String[] names = { "tom", "jerry", "alice", "bob", 
"john", "grace" };
-       private final static int GRADE_COUNT = 5;
-       private final static int SALARY_MAX = 10000;
-       private final static int SLEEP_TIME = 10;
-
-       /**
-        * Continuously emit tuples with random names and integers (grades).
-        */
-       public static class GradeSource implements 
SourceFunction<Tuple2<String, Integer>> {
-               private static final long serialVersionUID = 1L;
-
-               private Random rand;
-               private Tuple2<String, Integer> outTuple;
-
-               public GradeSource() {
-                       rand = new Random();
-                       outTuple = new Tuple2<String, Integer>();
-               }
-
-               @Override
-               public void invoke(Collector<Tuple2<String, Integer>> out) 
throws Exception {
-                       while (true) {
-                               outTuple.f0 = names[rand.nextInt(names.length)];
-                               outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
-                               out.collect(outTuple);
-                               Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
-                       }
-               }
-       }
-
-       /**
-        * Continuously emit tuples with random names and integers (salaries).
-        */
-       public static class SalarySource extends 
RichSourceFunction<Tuple2<String, Integer>> {
-               private static final long serialVersionUID = 1L;
-
-               private transient Random rand;
-               private transient Tuple2<String, Integer> outTuple;
-
-               public void open(Configuration parameters) throws Exception {
-                       super.open(parameters);
-                       rand = new Random();
-                       outTuple = new Tuple2<String, Integer>();
-               }
-
-               @Override
-               public void invoke(Collector<Tuple2<String, Integer>> out) 
throws Exception {
-                       while (true) {
-                               outTuple.f0 = names[rand.nextInt(names.length)];
-                               outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
-                               out.collect(outTuple);
-                               Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
-                       }
-               }
-       }
-
-       public static class MyJoinFunction
-                       implements
-                       JoinFunction<Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple3<String, Integer, Integer>> {
-
-               private static final long serialVersionUID = 1L;
-
-               private Tuple3<String, Integer, Integer> joined = new 
Tuple3<String, Integer, Integer>();
-
-               @Override
-               public Tuple3<String, Integer, Integer> join(Tuple2<String, 
Integer> first,
-                               Tuple2<String, Integer> second) throws 
Exception {
-                       joined.f0 = first.f0;
-                       joined.f1 = first.f1;
-                       joined.f2 = second.f1;
-                       return joined;
-               }
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String outputPath;
-
-       private static boolean parseParameters(String[] args) {
-
-               if (args.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if (args.length == 1) {
-                               outputPath = args[0];
-                       } else {
-                               System.err.println("Usage: WindowJoin <result 
path>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing WindowJoin with generated 
data.");
-                       System.out.println("  Provide parameter to write to 
file.");
-                       System.out.println("  Usage: WindowJoin <result path>");
-               }
-               return true;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
deleted file mode 100755
index 375c86d..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.ml;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.util.Collector;
-
-/**
- * Skeleton for incremental machine learning algorithm consisting of a
- * pre-computed model, which gets updated for the new inputs and new input data
- * for which the job provides predictions.
- * 
- * <p>
- * This may serve as a base of a number of algorithms, e.g. updating an
- * incremental Alternating Least Squares model while also providing the
- * predictions.
- * </p>
- * 
- * <p>
- * This example shows how to use:
- * <ul>
- * <li>Connected streams
- * <li>CoFunctions
- * <li>Tuple data types
- * </ul>
- */
-public class IncrementalLearningSkeleton {
-
-       // 
*************************************************************************
-       // PROGRAM
-       // 
*************************************************************************
-
-       public static void main(String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               // build new model on every second of new data
-               DataStream<Double[]> model = env.addSource(new 
TrainingDataSource())
-                               .window(Time.of(5000, TimeUnit.MILLISECONDS))
-                               .reduceGroup(new PartialModelBuilder());
-
-               // use partial model for prediction
-               DataStream<Integer> prediction = env.addSource(new 
NewDataSource()).connect(model)
-                               .map(new Predictor());
-
-               // emit result
-               if (fileOutput) {
-                       prediction.writeAsText(outputPath, 1);
-               } else {
-                       prediction.print();
-               }
-
-               // execute program
-               env.execute("Streaming Incremental Learning");
-       }
-
-       // 
*************************************************************************
-       // USER FUNCTIONS
-       // 
*************************************************************************
-
-       /**
-        * Feeds new data for prediction. By default it is implemented as 
constantly
-        * emitting the Integer 1 in a loop.
-        */
-       public static class NewDataSource implements SourceFunction<Integer> {
-               private static final long serialVersionUID = 1L;
-               private static final int NEW_DATA_SLEEP_TIME = 1000;
-
-               @Override
-               public void invoke(Collector<Integer> collector) throws 
Exception {
-                       while (true) {
-                               collector.collect(getNewData());
-                       }
-               }
-
-               private Integer getNewData() throws InterruptedException {
-                       Thread.sleep(NEW_DATA_SLEEP_TIME);
-                       return 1;
-               }
-       }
-
-       /**
-        * Feeds new training data for the partial model builder. By default it 
is
-        * implemented as constantly emitting the Integer 1 in a loop.
-        */
-       public static class TrainingDataSource implements 
SourceFunction<Integer> {
-               private static final long serialVersionUID = 1L;
-               private static final int TRAINING_DATA_SLEEP_TIME = 10;
-
-               @Override
-               public void invoke(Collector<Integer> collector) throws 
Exception {
-                       while (true) {
-                               collector.collect(getTrainingData());
-                       }
-
-               }
-
-               private Integer getTrainingData() throws InterruptedException {
-                       Thread.sleep(TRAINING_DATA_SLEEP_TIME);
-                       return 1;
-
-               }
-       }
-
-       /**
-        * Builds up-to-date partial models on new training data.
-        */
-       public static class PartialModelBuilder implements 
GroupReduceFunction<Integer, Double[]> {
-               private static final long serialVersionUID = 1L;
-
-               protected Double[] buildPartialModel(Iterable<Integer> values) {
-                       return new Double[] { 1. };
-               }
-
-               @Override
-               public void reduce(Iterable<Integer> values, 
Collector<Double[]> out) throws Exception {
-                       out.collect(buildPartialModel(values));
-               }
-       }
-
-       /**
-        * Creates prediction using the model produced in batch-processing and 
the
-        * up-to-date partial model.
-        * 
-        * <p>
-        * By defaults emits the Integer 0 for every prediction and the Integer 
1
-        * for every model update.
-        * </p>
-        */
-       public static class Predictor implements CoMapFunction<Integer, 
Double[], Integer> {
-               private static final long serialVersionUID = 1L;
-
-               Double[] batchModel = null;
-               Double[] partialModel = null;
-
-               @Override
-               public Integer map1(Integer value) {
-                       // Return prediction
-                       return predict(value);
-               }
-
-               @Override
-               public Integer map2(Double[] value) {
-                       // Update model
-                       partialModel = value;
-                       batchModel = getBatchModel();
-                       return 1;
-               }
-
-               // pulls model built with batch-job on the old training data
-               protected Double[] getBatchModel() {
-                       return new Double[] { 0. };
-               }
-
-               // performs prediction using the two models
-               protected Integer predict(Integer inTuple) {
-                       return 0;
-               }
-
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String outputPath;
-
-       private static boolean parseParameters(String[] args) {
-
-               if (args.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if (args.length == 1) {
-                               outputPath = args[0];
-                       } else {
-                               System.err.println("Usage: 
IncrementalLearningSkeleton <result path>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing 
IncrementalLearningSkeleton with generated data.");
-                       System.out.println("  Provide parameter to write to 
file.");
-                       System.out.println("  Usage: 
IncrementalLearningSkeleton <result path>");
-               }
-               return true;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
deleted file mode 100644
index e9b60f4..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.socket;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer;
-
-/**
- * This example shows an implementation of WordCount with data from a text
- * socket. To run the example make sure that the service providing the text 
data
- * is already up and running.
- * 
- * <p>
- * To start an example socket text stream on your local machine run netcat from
- * a command line: <code>nc -lk 9999</code>, where the parameter specifies the
- * port number.
- * 
- * 
- * <p>
- * Usage:
- * <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port&gt; &lt;result 
path&gt;</code>
- * <br>
- * 
- * <p>
- * This example shows how to:
- * <ul>
- * <li>use StreamExecutionEnvironment.socketTextStream
- * <li>write a simple Flink program,
- * <li>write and use user-defined functions.
- * </ul>
- * 
- * @see <a href="www.openbsd.org/cgi-bin/man.cgi?query=nc">netcat</a>
- */
-public class SocketTextStreamWordCount {
-       public static void main(String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               // set up the execution environment
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment
-                               .getExecutionEnvironment();
-
-               // get input data
-               DataStream<String> text = env.socketTextStream(hostName, port);
-
-               DataStream<Tuple2<String, Integer>> counts =
-               // split up the lines in pairs (2-tuples) containing: (word,1)
-               text.flatMap(new Tokenizer())
-               // group by the tuple field "0" and sum up tuple field "1"
-                               .groupBy(0)
-                               .sum(1);
-
-               if (fileOutput) {
-                       counts.writeAsText(outputPath, 1);
-               } else {
-                       counts.print();
-               }
-
-               // execute program
-               env.execute("WordCount from SocketTextStream Example");
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String hostName;
-       private static int port;
-       private static String outputPath;
-
-       private static boolean parseParameters(String[] args) {
-
-               // parse input arguments
-               if (args.length == 3) {
-                       fileOutput = true;
-                       hostName = args[0];
-                       port = Integer.valueOf(args[1]);
-                       outputPath = args[2];
-               } else if (args.length == 2) {
-                       hostName = args[0];
-                       port = Integer.valueOf(args[1]);
-               } else {
-                       System.err.println("Usage: SocketTextStreamWordCount 
<hostname> <port> [<output path>]");
-                       return false;
-               }
-               return true;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
deleted file mode 100644
index 1901475..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.twitter;
-
-import java.util.StringTokenizer;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
-import org.apache.flink.streaming.examples.twitter.util.TwitterStreamData;
-import org.apache.flink.util.Collector;
-import org.apache.sling.commons.json.JSONException;
-
-/**
- * Implements the "TwitterStream" program that computes a most used word
- * occurrence over JSON files in a streaming fashion.
- * 
- * <p>
- * The input is a JSON text file with lines separated by newline characters.
- * 
- * <p>
- * Usage: <code>TwitterStream &lt;text path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link TwitterStreamData}.
- * 
- * <p>
- * This example shows how to:
- * <ul>
- * <li>acquire external data,
- * <li>use in-line defined functions,
- * <li>handle flattened stream inputs.
- * </ul>
- * 
- */
-public class TwitterStream {
-
-       // 
*************************************************************************
-       // PROGRAM
-       // 
*************************************************************************
-
-       public static void main(String[] args) throws Exception {
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               // set up the execution environment
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               env.setBufferTimeout(1000);
-
-               // get input data
-               DataStream<String> streamSource = getTextDataStream(env);
-
-               DataStream<Tuple2<String, Integer>> tweets = streamSource
-               // selecting English tweets and splitting to words
-                               .flatMap(new SelectEnglishAndTokenizeFlatMap())
-                               // returning (word, 1)
-                               .map(new MapFunction<String, Tuple2<String, 
Integer>>() {
-                                       private static final long 
serialVersionUID = 1L;
-
-                                       @Override
-                                       public Tuple2<String, Integer> 
map(String value) throws Exception {
-                                               return new Tuple2<String, 
Integer>(value, 1);
-                                       }
-                               })
-                               // group by words and sum their occurence
-                               .groupBy(0).sum(1)
-                               // select word with maximum occurence
-                               .flatMap(new SelectMaxOccurence());
-
-               // emit result
-               if (fileOutput) {
-                       tweets.writeAsText(outputPath, 1);
-               } else {
-                       tweets.print();
-               }
-
-               // execute program
-               env.execute("Twitter Streaming Example");
-       }
-
-       // 
*************************************************************************
-       // USER FUNCTIONS
-       // 
*************************************************************************
-
-       /**
-        * Makes sentences from English tweets.
-        * 
-        * <p>
-        * Implements a string tokenizer that splits sentences into words as a
-        * user-defined FlatMapFunction. The function takes a line (String) and
-        * splits it into multiple pairs in the form of "(word,1)" 
(Tuple2<String,
-        * Integer>).
-        * </p>
-        */
-       public static class SelectEnglishAndTokenizeFlatMap extends 
JSONParseFlatMap<String, String> {
-               private static final long serialVersionUID = 1L;
-
-               /**
-                * Select the language from the incoming JSON text
-                */
-               @Override
-               public void flatMap(String value, Collector<String> out) throws 
Exception {
-                       try {
-                               if (getString(value, "lang").equals("en")) {
-                                       // message of tweet
-                                       StringTokenizer tokenizer = new 
StringTokenizer(getString(value, "text"));
-
-                                       // split the message
-                                       while (tokenizer.hasMoreTokens()) {
-                                               String result = 
tokenizer.nextToken().replaceAll("\\s*", "");
-
-                                               if (result != null && 
!result.equals("")) {
-                                                       out.collect(result);
-                                               }
-                                       }
-                               }
-                       } catch (JSONException e) {
-
-                       }
-               }
-       }
-
-       /**
-        * Implements a user-defined FlatMapFunction that checks if the current
-        * occurence is higher than the maximum occurence. If so, returns the 
word
-        * and changes the maximum.
-        */
-       public static class SelectMaxOccurence implements
-                       FlatMapFunction<Tuple2<String, Integer>, Tuple2<String, 
Integer>> {
-               private static final long serialVersionUID = 1L;
-               private Integer maximum;
-
-               public SelectMaxOccurence() {
-                       this.maximum = 0;
-               }
-
-               @Override
-               public void flatMap(Tuple2<String, Integer> value, 
Collector<Tuple2<String, Integer>> out)
-                               throws Exception {
-                       if ((Integer) value.getField(1) >= maximum) {
-                               out.collect(value);
-                               maximum = (Integer) value.getField(1);
-                       }
-               }
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String textPath;
-       private static String outputPath;
-
-       private static boolean parseParameters(String[] args) {
-               if (args.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if (args.length == 2) {
-                               textPath = args[0];
-                               outputPath = args[1];
-                       } else {
-                               System.err.println("USAGE:\nTwitterStream 
<pathToPropertiesFile> <result path>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing TwitterStream example 
with built-in default data.");
-                       System.out.println("  Provide parameters to read input 
data from a file.");
-                       System.out.println("  USAGE: TwitterStream 
<pathToPropertiesFile>");
-               }
-               return true;
-       }
-
-       private static DataStream<String> 
getTextDataStream(StreamExecutionEnvironment env) {
-               if (fileOutput) {
-                       // read the text file from given input path
-                       return env.readTextFile(textPath);
-               } else {
-                       // get default test text data
-                       return env.fromElements(TwitterStreamData.TEXTS);
-               }
-       }
-}

Reply via email to