diff --git 
deleted file mode 100644
index 1b1d5c5..0000000
+++ /dev/null
@@ -1,184 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala.relational
-import org.apache.flink.api.scala._
-import org.apache.flink.util.Collector
- * This program implements a modified version of the TPC-H query 10. 
- * 
- * The original query can be found at
- * 
- * (page 45).
- *
- * This program implements the following SQL equivalent:
- *
- * {{{
- *        c_custkey,
- *        c_name, 
- *        c_address,
- *        n_name, 
- *        c_acctbal
- *        SUM(l_extendedprice * (1 - l_discount)) AS revenue,  
- * FROM   
- *        customer, 
- *        orders, 
- *        lineitem, 
- *        nation 
- * WHERE 
- *        c_custkey = o_custkey 
- *        AND l_orderkey = o_orderkey 
- *        AND YEAR(o_orderdate) > '1990' 
- *        AND l_returnflag = 'R' 
- *        AND c_nationkey = n_nationkey 
- *        c_custkey, 
- *        c_name, 
- *        c_acctbal, 
- *        n_name, 
- *        c_address
- * }}}
- *
- * Compared to the original TPC-H query this version does not print 
- * c_phone and c_comment, only filters by years greater than 1990 instead of
- * a period of 3 months, and does not sort the result by revenue..
- *
- * Input files are plain text CSV files using the pipe character ('|') as 
field separator 
- * as generated by the TPC-H data generator which is available at 
- * [](a href="
- *
- * Usage: 
- * {{{
- *TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> 
<nation path> <result path>
- * }}}
- *  
- * This example shows how to use:
- *  - tuple data types
- *  - build-in aggregation functions
- *  - join with size hints
- *  
- */
-object TPCHQuery10 {
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-    // get execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    // get customer data set: (custkey, name, address, nationkey, acctbal) 
-    val customers = getCustomerDataSet(env)
-    // get orders data set: (orderkey, custkey, orderdate)
-    val orders = getOrdersDataSet(env)
-    // get lineitem data set: (orderkey, extendedprice, discount, returnflag)
-    val lineitems = getLineitemDataSet(env)
-    // get nation data set: (nationkey, name)    
-    val nations = getNationDataSet(env)
-    // filter orders by years
-    val orders1990 = orders.filter( o => o._3.substring(0,4).toInt > 1990)
-                           .map( o => (o._1, o._2))
-    // filter lineitems by return status
-    val lineitemsReturn = lineitems.filter( l => l._4.equals("R"))
-                                   .map( l => (l._1, l._2 * (1 - l._3)) )
-    // compute revenue by customer
-    val revenueByCustomer = 
-                                        .apply( (o,l) => (o._2, l._2) )
-                                      .groupBy(0)
-                                      .aggregate(Aggregations.SUM, 1)
-    // compute final result by joining customer and nation information with 
-    val result = customers.joinWithTiny(nations).where(3).equalTo(0)
-                            .apply( (c, n) => (c._1, c._2, c._3, n._2, c._5) )
-                          .join(revenueByCustomer).where(0).equalTo(0)
-                            .apply( (c, r) => (c._1, c._2, c._3, c._4, c._5, 
r._2) )
-    // emit result
-    result.writeAsCsv(outputPath, "\n", "|")
-    // execute program
-    env.execute("Scala TPCH Query 10 Example")
-  }
-  // *************************************************************************
-  //     UTIL METHODS
-  // *************************************************************************
-  private var customerPath: String = null
-  private var ordersPath: String = null
-  private var lineitemPath: String = null
-  private var nationPath: String = null
-  private var outputPath: String = null
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length == 5) {
-      customerPath = args(0)
-      ordersPath = args(1)
-      lineitemPath = args(2)
-      nationPath = args(3)
-      outputPath = args(4)
-      true
-    } else {
-      System.err.println("This program expects data from the TPC-H benchmark 
as input data.\n" +
-          "  Due to legal restrictions, we can not ship generated data.\n" +
-          "  You can find the TPC-H data generator at\n"; +
-          "  Usage: TPCHQuery10 <customer-csv path> <orders-csv path> " + 
-                                "<lineitem-csv path> <nation-csv path> <result 
-      false
-    }
-  }
-  private def getCustomerDataSet(env: ExecutionEnvironment): 
-                         DataSet[Tuple5[Int, String, String, Int, Double]] = {
-    env.readCsvFile[Tuple5[Int, String, String, Int, Double]](
-        customerPath,
-        fieldDelimiter = "|",
-        includedFields = Array(0,1,2,3,5) )
-  }
-  private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Tuple3[Int, 
Int, String]] = {
-    env.readCsvFile[Tuple3[Int, Int, String]](
-        ordersPath,
-        fieldDelimiter = "|",
-        includedFields = Array(0, 1, 4) )
-  }
-  private def getLineitemDataSet(env: ExecutionEnvironment):
-                         DataSet[Tuple4[Int, Double, Double, String]] = {
-    env.readCsvFile[Tuple4[Int, Double, Double, String]](
-        lineitemPath,
-        fieldDelimiter = "|",
-        includedFields = Array(0, 5, 6, 8) )
-  }
-  private def getNationDataSet(env: ExecutionEnvironment): DataSet[Tuple2[Int, 
String]] = {
-    env.readCsvFile[Tuple2[Int, String]](
-        nationPath,
-        fieldDelimiter = "|",
-        includedFields = Array(0, 1) )
-  }
diff --git 
deleted file mode 100644
index 0e0ecd3..0000000
+++ /dev/null
@@ -1,172 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala.relational
-import org.apache.flink.api.scala._
-import org.apache.flink.core.fs.FileSystem.WriteMode
- * This program implements a modified version of the TPC-H query 3. The
- * example demonstrates how to assign names to fields by extending the Tuple 
- * The original query can be found at
- * 
- * (page 29).
- *
- * This program implements the following SQL equivalent:
- *
- * {{{
- *      l_orderkey, 
- *      SUM(l_extendedprice*(1-l_discount)) AS revenue,
- *      o_orderdate, 
- *      o_shippriority 
- * FROM customer, 
- *      orders, 
- *      lineitem 
- *      c_mktsegment = '[SEGMENT]' 
- *      AND c_custkey = o_custkey
- *      AND l_orderkey = o_orderkey
- *      AND o_orderdate < date '[DATE]'
- *      AND l_shipdate > date '[DATE]'
- *      l_orderkey, 
- *      o_orderdate, 
- *      o_shippriority;
- * }}}
- *
- * Compared to the original TPC-H query this version does not sort the result 
by revenue
- * and orderdate.
- *
- * Input files are plain text CSV files using the pipe character ('|') as 
field separator 
- * as generated by the TPC-H data generator which is available at 
- * [](a href="
- *
- * Usage: 
- * {{{
- * TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> 
<result path>
- * }}}
- *  
- * This example shows how to use:
- *  - case classes and case class field addressing
- *  - build-in aggregation functions
- * 
- */
-object TPCHQuery3 {
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-    // set filter date
-    val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")
-    val date = dateFormat.parse("1995-03-12")
-    // get execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    // read and filter lineitems by shipDate
-    val lineitems = getLineitemDataSet(env).filter( l => 
dateFormat.parse(l.shipDate).after(date) )
-    // read and filter customers by market segment
-    val customers = getCustomerDataSet(env).filter( c => 
-    // read orders
-    val orders = getOrdersDataSet(env)
-                      // filter orders by order date
-    val items = orders.filter( o => dateFormat.parse(o.orderDate).before(date) 
-                      // filter orders by joining with customers
.join(customers).where("custId").equalTo("custId").apply( (o,c) => o )
-                      // join with lineitems 
-                      .join(lineitems).where("orderId").equalTo("orderId")
-                                      .apply( (o,l) => 
-                                        new ShippedItem( o.orderId,
-                                                         l.extdPrice * (1.0 -,
-                                                         o.orderDate,
-                                                         o.shipPrio ) )
-    // group by order and aggregate revenue
-    val result = items.groupBy("orderId", "orderDate", "shipPrio")
-                      .aggregate(Aggregations.SUM, "revenue")
-    // emit result
-    result.writeAsCsv(outputPath, "\n", "|")
-    // execute program
-    env.execute("Scala TPCH Query 3 Example")
-  }
-  // *************************************************************************
-  // *************************************************************************
-  case class Lineitem(orderId: Long, extdPrice: Double, discount: Double, 
shipDate: String)
-  case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: 
-  case class Customer(custId: Long, mktSegment: String)
-  case class ShippedItem(orderId: Long, revenue: Double, orderDate: String, 
shipPrio: Long)
-  // *************************************************************************
-  //     UTIL METHODS
-  // *************************************************************************
-  private var lineitemPath: String = null
-  private var customerPath: String = null
-  private var ordersPath: String = null
-  private var outputPath: String = null
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length == 4) {
-      lineitemPath = args(0)
-      customerPath = args(1)
-      ordersPath = args(2)
-      outputPath = args(3)
-      true
-    } else {
-      System.err.println("This program expects data from the TPC-H benchmark 
as input data.\n" +
-          " Due to legal restrictions, we can not ship generated data.\n" +
-          " You can find the TPC-H data generator at\n"; +
-          " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" + 
-                             "<orders-csv path> <result path>")
-      false
-    }
-  }
-  private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] 
= {
-    env.readCsvFile[Lineitem](
-        lineitemPath,
-        fieldDelimiter = "|",
-        includedFields = Array(0, 5, 6, 10) )
-  }
-  private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] 
= {
-    env.readCsvFile[Customer](
-        customerPath,
-        fieldDelimiter = "|",
-        includedFields = Array(0, 6) )
-  }
-  private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
-    env.readCsvFile[Order](
-        ordersPath,
-        fieldDelimiter = "|",
-        includedFields = Array(0, 1, 4, 7) )
-  }
diff --git 
deleted file mode 100644
index 5392594..0000000
+++ /dev/null
@@ -1,210 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala.relational
-import org.apache.flink.api.scala._
-import org.apache.flink.util.Collector
- * This program processes web logs and relational data.
- * It implements the following relational query:
- *
- * {{{
- *       r.pageURL,
- *       r.pageRank,
- *       r.avgDuration
- * FROM documents d JOIN rankings r
- *                  ON d.url = r.url
- * WHERE CONTAINS(d.text, [keywords])
- *       AND r.rank > [rank]
- *       AND NOT EXISTS
- *           (
- *              SELECT * FROM Visits v
- *              WHERE v.destUrl = d.url
- *                    AND v.visitDate < [date]
- *           );
- * }}}
- *
- *
- * Input files are plain text CSV files using the pipe character ('|') as 
field separator.
- * The tables referenced in the query can be generated using the
- * []] and
- * have the following schemas
- *
- * {{{
- * CREATE TABLE Documents (
- *                url VARCHAR(100) PRIMARY KEY,
- *                contents TEXT );
- *
- * CREATE TABLE Rankings (
- *                pageRank INT,
- *                pageURL VARCHAR(100) PRIMARY KEY,
- *                avgDuration INT );
- *
- * CREATE TABLE Visits (
- *                sourceIP VARCHAR(16),
- *                destURL VARCHAR(100),
- *                visitDate DATE,
- *                adRevenue FLOAT,
- *                userAgent VARCHAR(64),
- *                countryCode VARCHAR(3),
- *                languageCode VARCHAR(6),
- *                searchWord VARCHAR(32),
- *                duration INT );
- * }}}
- *
- *
- * Usage
- * {{{
- *   WebLogAnalysis <documents path> <ranks path> <visits path> <result path>
- * }}}
- *
- * If no parameters are provided, the program is run with default data from
- * [[]].
- *
- * This example shows how to use:
- *
- *  - tuple data types
- *  - projection and join projection
- *  - the CoGroup transformation for an anti-join
- *
- */
-object WebLogAnalysis {
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val documents = getDocumentsDataSet(env)
-    val ranks = getRanksDataSet(env)
-    val visits = getVisitsDataSet(env)
-    val filteredDocs = documents
-      .filter(doc => doc._2.contains(" editors ") && doc._2.contains(" 
oscillations "))
-    val filteredRanks = ranks
-      .filter(rank => rank._1 > 40)
-    val filteredVisits = visits
-      .filter(visit => visit._2.substring(0, 4).toInt == 2007)
-    val joinDocsRanks = filteredDocs.join(filteredRanks).where(0).equalTo(1) {
-      (doc, rank) => rank
-    }.withForwardedFieldsSecond("*")
-    val result = joinDocsRanks.coGroup(filteredVisits).where(1).equalTo(0) {
-      (ranks, visits, out: Collector[(Int, String, Int)]) =>
-        if (visits.isEmpty) for (rank <- ranks) out.collect(rank)
-    }.withForwardedFieldsFirst("*")
-    // emit result
-    if (fileOutput) {
-      result.writeAsCsv(outputPath, "\n", "|")
-      env.execute("Scala WebLogAnalysis Example")
-    } else {
-      result.print()
-    }
-  }
-  private var fileOutput: Boolean = false
-  private var documentsPath: String = null
-  private var ranksPath: String = null
-  private var visitsPath: String = null
-  private var outputPath: String = null
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      fileOutput = true
-      if (args.length == 4) {
-        documentsPath = args(0)
-        ranksPath = args(1)
-        visitsPath = args(2)
-        outputPath = args(3)
-      }
-      else {
-        System.err.println("Usage: WebLogAnalysis <documents path> <ranks 
path> <visits path> " +
-          "<result path>")
-        return false
-      }
-    }
-    else {
-      System.out.println("Executing WebLog Analysis example with built-in 
default data.")
-      System.out.println("  Provide parameters to read input data from files.")
-      System.out.println("  See the documentation for the correct format of 
input files.")
-      System.out.println("  We provide a data generator to create synthetic 
input files for this " +
-        "program.")
-      System.out.println("  Usage: WebLogAnalysis <documents path> <ranks 
path> <visits path> " +
-        "<result path>")
-    }
-    true
-  }
-  private def getDocumentsDataSet(env: ExecutionEnvironment): DataSet[(String, 
String)] = {
-    if (fileOutput) {
-      env.readCsvFile[(String, String)](
-        documentsPath,
-        fieldDelimiter = "|",
-        includedFields = Array(0, 1))
-    }
-    else {
-      val documents = WebLogData.DOCUMENTS map {
-        case Array(x, y) => (x.asInstanceOf[String], y.asInstanceOf[String])
-      }
-      env.fromCollection(documents)
-    }
-  }
-  private def getRanksDataSet(env: ExecutionEnvironment): DataSet[(Int, 
String, Int)] = {
-    if (fileOutput) {
-      env.readCsvFile[(Int, String, Int)](
-        ranksPath,
-        fieldDelimiter = "|",
-        includedFields = Array(0, 1, 2))
-    }
-    else {
-      val ranks = WebLogData.RANKS map {
-        case Array(x, y, z) => (x.asInstanceOf[Int], y.asInstanceOf[String], 
-      }
-      env.fromCollection(ranks)
-    }
-  }
-  private def getVisitsDataSet(env: ExecutionEnvironment): DataSet[(String, 
String)] = {
-    if (fileOutput) {
-      env.readCsvFile[(String, String)](
-        visitsPath,
-        fieldDelimiter = "|",
-        includedFields = Array(1, 2))
-    }
-    else {
-      val visits = WebLogData.VISITS map {
-        case Array(x, y) => (x.asInstanceOf[String], y.asInstanceOf[String])
-      }
-      env.fromCollection(visits)
-    }
-  }
diff --git 
deleted file mode 100644
index 7d5db7e..0000000
+++ /dev/null
@@ -1,100 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala.wordcount
-import org.apache.flink.api.scala._
- * Implements the "WordCount" program that computes a simple word occurrence 
- * over text files. 
- *
- * The input is a plain text file with lines separated by newline characters.
- *
- * Usage:
- * {{{
- *   WordCount <text path> <result path>>
- * }}}
- *
- * If no parameters are provided, the program is run with default data from
- * [[]]
- *
- * This example shows how to:
- *
- *   - write a simple Flink program.
- *   - use Tuple data types.
- *   - write and use user-defined functions.
- *
- */
-object WordCount {
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val text = getTextDataSet(env)
-    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { 
_.nonEmpty } }
-      .map { (_, 1) }
-      .groupBy(0)
-      .sum(1)
-    if (fileOutput) {
-      counts.writeAsCsv(outputPath, "\n", " ")
-      env.execute("Scala WordCount Example")
-    } else {
-      counts.print()
-    }
-  }
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      fileOutput = true
-      if (args.length == 2) {
-        textPath = args(0)
-        outputPath = args(1)
-        true
-      } else {
-        System.err.println("Usage: WordCount <text path> <result path>")
-        false
-      }
-    } else {
-      System.out.println("Executing WordCount example with built-in default 
-      System.out.println("  Provide parameters to read input data from a 
-      System.out.println("  Usage: WordCount <text path> <result path>")
-      true
-    }
-  }
-  private def getTextDataSet(env: ExecutionEnvironment): DataSet[String] = {
-    if (fileOutput) {
-      env.readTextFile(textPath)
-    }
-    else {
-      env.fromCollection(WordCountData.WORDS)
-    }
-  }
-  private var fileOutput: Boolean = false
-  private var textPath: String = null
-  private var outputPath: String = null
diff --git a/flink-examples/pom.xml b/flink-examples/pom.xml
index e4376f4..ace7ec2 100644
--- a/flink-examples/pom.xml
+++ b/flink-examples/pom.xml
@@ -47,7 +47,7 @@ under the License.
-               <module>flink-java-examples</module>
-               <module>flink-scala-examples</module>
+               <module>flink-examples-batch</module>
+               <module>flink-examples-streaming</module>
diff --git a/flink-staging/flink-fs-tests/pom.xml 
index 9492d55..69c5f30 100644
--- a/flink-staging/flink-fs-tests/pom.xml
+++ b/flink-staging/flink-fs-tests/pom.xml
@@ -59,7 +59,7 @@ under the License.
-                       <artifactId>flink-java-examples</artifactId>
+                       <artifactId>flink-examples-batch</artifactId>
diff --git a/flink-staging/flink-table/pom.xml 
index bdd1b58..f13f422 100644
--- a/flink-staging/flink-table/pom.xml
+++ b/flink-staging/flink-table/pom.xml
@@ -54,7 +54,7 @@ under the License.
-                       <artifactId>flink-scala-examples</artifactId>
+                       <artifactId>flink-examples-batch</artifactId>
diff --git a/flink-staging/flink-tez/pom.xml b/flink-staging/flink-tez/pom.xml
index 0062349..6083f7f 100644
--- a/flink-staging/flink-tez/pom.xml
+++ b/flink-staging/flink-tez/pom.xml
@@ -85,7 +85,7 @@ under the License.
-            <artifactId>flink-java-examples</artifactId>
+            <artifactId>flink-examples-batch</artifactId>
diff --git a/flink-streaming-examples/pom.xml b/flink-streaming-examples/pom.xml
deleted file mode 100644
index c935ac1..0000000
--- a/flink-streaming-examples/pom.xml
+++ /dev/null
@@ -1,535 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-<project xmlns=""; 
-       xsi:schemaLocation="";>
-       <modelVersion>4.0.0</modelVersion>
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-parent</artifactId>
-               <version>1.0-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-       <artifactId>flink-streaming-examples</artifactId>
-       <name>flink-streaming-examples</name>
-       <packaging>jar</packaging>
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-java</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-scala</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-java-examples</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-connector-twitter</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-java</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-                       <type>test-jar</type>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-tests</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-test-utils</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-       </dependencies>
-       <build>
-               <plugins>
-                       <!-- get default data from flink-java-examples package 
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-dependency-plugin</artifactId>
-                               <version>2.9</version><!--$NO-MVN-MAN-VER$-->
-                               <executions>
-                                       <execution>
-                                               <id>unpack</id>
-                                               <phase>prepare-package</phase>
-                                               <goals>
-                                                       <goal>unpack</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <artifactItems>
-                                                               <!-- For 
WordCount example data -->
-                                                               <artifactItem>
-                                                               </artifactItem>
-                                                               <!-- For JSON 
utilities -->
-                                                               <artifactItem>
-                                                               </artifactItem>
-                                                       </artifactItems>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-                       <!-- self-contained jars for each example -->
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-jar-plugin</artifactId>
-                               <version>2.4</version><!--$NO-MVN-MAN-VER$-->
-                               <executions>
-                                       <!-- Default Execution -->
-                                       <execution>
-                                               <id>default</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>test-jar</goal>
-                                               </goals>
-                                       </execution>
-                                       <!-- Iteration -->
-                                       <execution>
-                                               <id>Iteration</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <archive>
-                                                       </archive>
-                                                       <includes>
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-                                       <!-- IncrementalLearning -->
-                                       <execution>
-                                               <id>IncrementalLearning</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <archive>
-                                                       </archive>
-                                                       <includes>
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-                                       <!-- Twitter -->
-                                       <execution>
-                                               <id>Twitter</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <archive>
-                                                       </archive>
-                                                       <includes>
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-                                       <!-- WindowJoin -->
-                                       <execution>
-                                               <id>WindowJoin</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <archive>
-                                                       </archive>
-                                                       <includes>
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-                                       <!-- WordCountPOJO -->
-                                       <execution>
-                                               <id>WordCountPOJO</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <archive>
-                                                       </archive>
-                                                       <includes>
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-                                       <!-- WordCount -->
-                                       <execution>
-                                               <id>WordCount</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <archive>
-                                                       </archive>
-                                                       <includes>
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-                                       <!-- WindowWordCount -->
-                                       <execution>
-                                               <id>WindowWordCount</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <archive>
-                                                       </archive>
-                                                       <includes>
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-                                       <!-- SocketTextStreamWordCount -->
-                                       <execution>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <archive>
-                                                       </archive>
-                                                       <includes>
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-                                       <!-- TopSpeedWindowing -->
-                                       <execution>
-                                               <id>TopSpeedWindowing</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <archive>
-                                                       </archive>
-                                                       <includes>
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-                                       <!-- SessionWindowing -->
-                                       <execution>
-                                               <id>SessionWindowing</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <archive>
-                                                       </archive>
-                                                       <includes>
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-                       <!-- Scala Compiler -->
-                       <plugin>
-                               <groupId>net.alchim31.maven</groupId>
-                               <artifactId>scala-maven-plugin</artifactId>
-                               <version>3.1.4</version>
-                               <executions>
-                                       <!-- Run scala compiler in the 
process-resources phase, so that dependencies on
-                                               scala classes can be resolved 
later in the (Java) compile phase -->
-                                       <execution>
-                                               <id>scala-compile-first</id>
-                                               <phase>process-resources</phase>
-                                               <goals>
-                                                       <goal>compile</goal>
-                                               </goals>
-                                       </execution>
-                                       <!-- Run scala compiler in the 
process-test-resources phase, so that dependencies on
-                                                scala classes can be resolved 
later in the (Java) test-compile phase -->
-                                       <execution>
-                                               <id>scala-test-compile</id>
-                                               <goals>
-                                                       <goal>testCompile</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                               <configuration>
-                                       <jvmArgs>
-                                               <jvmArg>-Xms128m</jvmArg>
-                                               <jvmArg>-Xmx512m</jvmArg>
-                                       </jvmArgs>
-                               </configuration>
-                       </plugin>
-                       <!-- Eclipse Integration -->
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-eclipse-plugin</artifactId>
-                               <version>2.8</version>
-                               <configuration>
-                                       <downloadSources>true</downloadSources>
-                                       <projectnatures>
-                                       </projectnatures>
-                                       <buildcommands>
-                                       </buildcommands>
-                                       <classpathContainers>
-                                       </classpathContainers>
-                                       <excludes>
-                                       </excludes>
-                                       <sourceIncludes>
-                                       </sourceIncludes>
-                               </configuration>
-                       </plugin>
-                       <!-- Adding scala source directories to build path -->
-                       <plugin>
-                               <groupId>org.codehaus.mojo</groupId>
-                               <version>1.7</version>
-                               <executions>
-                                       <!-- Add src/main/scala to eclipse 
build path -->
-                                       <execution>
-                                               <id>add-source</id>
-                                               <phase>generate-sources</phase>
-                                               <goals>
-                                                       <goal>add-source</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <sources>
-                                                       </sources>
-                                               </configuration>
-                                       </execution>
-                                       <!-- Add src/test/scala to eclipse 
build path -->
-                                       <execution>
-                                               <id>add-test-source</id>
-                                               <goals>
-                                               </goals>
-                                               <configuration>
-                                                       <sources>
-                                                       </sources>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-                       <plugin>
-                               <groupId>org.scalastyle</groupId>
-                               <artifactId>scalastyle-maven-plugin</artifactId>
-                               <version>0.5.0</version>
-                               <executions>
-                                       <execution>
-                                               <goals>
-                                                       <goal>check</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                               <configuration>
-                                       <verbose>false</verbose>
-                                       <failOnViolation>true</failOnViolation>
-                                       <failOnWarning>false</failOnWarning>
-                                       <outputEncoding>UTF-8</outputEncoding>
-                               </configuration>
-                       </plugin>
-               </plugins>
-               <pluginManagement>
-                       <plugins>
-                               <!--This plugin's configuration is used to 
store Eclipse m2e settings only. It has no influence on the Maven build 
-                               <plugin>
-                                       <groupId>org.eclipse.m2e</groupId>
-                                       <version>1.0.0</version>
-                                       <configuration>
-                                               <lifecycleMappingMetadata>
-                                                       <pluginExecutions>
-                                                                       <action>
-                                                       </pluginExecutions>
-                                               </lifecycleMappingMetadata>
-                                       </configuration>
-                               </plugin>
-                       </plugins>
-               </pluginManagement>
-       </build>
diff --git 
deleted file mode 100644
index b6e1a61..0000000
+++ /dev/null
@@ -1,246 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.examples.iteration;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.IterativeStream;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
- * Example illustrating iterations in Flink streaming.
- * <p> The program sums up random numbers and counts additions
- * it performs to reach a specific threshold in an iterative streaming 
fashion. </p>
- *
- * <p>
- * This example shows how to use: <ul> <li>streaming iterations, <li>buffer 
timeout to enhance latency, <li>directed
- * outputs. </ul>
- */
-public class IterateExample {
-       private static final int BOUND = 100;
-       // 
-       // PROGRAM
-       // 
-       public static void main(String[] args) throws Exception {
-               if (!parseParameters(args)) {
-                       return;
-               }
-               // set up input for the stream of integer pairs
-               // obtain execution environment and set setBufferTimeout to 1 
to enable
-               // continuous flushing of the output buffers (lowest latency)
-               StreamExecutionEnvironment env = 
-                               .setBufferTimeout(1);
-               // create input stream of integer pairs
-               DataStream<Tuple2<Integer, Integer>> inputStream;
-               if (fileInput) {
-                       inputStream = env.readTextFile(inputPath).map(new 
-               } else {
-                       inputStream = env.addSource(new 
-               }
-               // create an iterative data stream from the input with 5 second 
-               IterativeStream<Tuple5<Integer, Integer, Integer, Integer, 
Integer>> it = InputMap())
-                               .iterate(5000);
-               // apply the step function to get the next Fibonacci number
-               // increment the counter and split the output with the output 
-               SplitStream<Tuple5<Integer, Integer, Integer, Integer, 
Integer>> step = Step())
-                               .split(new MySelector());
-               // close the iteration by selecting the tuples that were 
directed to the
-               // 'iterate' channel in the output selector
-               it.closeWith("iterate"));
-               // to produce the final output select the tuples directed to the
-               // 'output' channel then get the input pairs that have the 
greatest iteration counter
-               // on a 1 second sliding window
-               DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers ="output")
-                               .map(new OutputMap());
-               // emit results
-               if (fileOutput) {
-                       numbers.writeAsText(outputPath, 1);
-               } else {
-                       numbers.print();
-               }
-               // execute the program
-               env.execute("Streaming Iteration Example");
-       }
-       // 
-       // 
-       /**
-        * Generate BOUND number of random integer pairs from the range from 0 
to BOUND/2
-        */
-       private static class RandomFibonacciSource implements 
SourceFunction<Tuple2<Integer, Integer>> {
-               private static final long serialVersionUID = 1L;
-               private Random rnd = new Random();
-               private volatile boolean isRunning = true;
-               private int counter = 0;
-               @Override
-               public void run(SourceContext<Tuple2<Integer, Integer>> ctx) 
throws Exception {
-                       while (isRunning && counter < BOUND) {
-                               int first = rnd.nextInt(BOUND / 2 - 1) + 1;
-                               int second = rnd.nextInt(BOUND / 2 - 1) + 1;
-                               ctx.collect(new Tuple2<>(first, second));
-                               counter++;
-                               Thread.sleep(50L);
-                       }
-               }
-               @Override
-               public void cancel() {
-                       isRunning = false;
-               }
-       }
-       /**
-        * Generate random integer pairs from the range from 0 to BOUND/2
-        */
-       private static class FibonacciInputMap implements MapFunction<String, 
Tuple2<Integer, Integer>> {
-               private static final long serialVersionUID = 1L;
-               @Override
-               public Tuple2<Integer, Integer> map(String value) throws 
Exception {
-                       String record = value.substring(1, value.length() - 1);
-                       String[] splitted = record.split(",");
-                       return new Tuple2<>(Integer.parseInt(splitted[0]), 
-               }
-       }
-       /**
-        * Map the inputs so that the next Fibonacci numbers can be calculated 
while preserving the original input tuple A
-        * counter is attached to the tuple and incremented in every iteration 
-        */
-       public static class InputMap implements MapFunction<Tuple2<Integer, 
Integer>, Tuple5<Integer, Integer, Integer,
-                       Integer, Integer>> {
-               private static final long serialVersionUID = 1L;
-               @Override
-               public Tuple5<Integer, Integer, Integer, Integer, Integer> 
map(Tuple2<Integer, Integer> value) throws
-                               Exception {
-                       return new Tuple5<>(value.f0, value.f1, value.f0, 
value.f1, 0);
-               }
-       }
-       /**
-        * Iteration step function that calculates the next Fibonacci number
-        */
-       public static class Step implements
-                       MapFunction<Tuple5<Integer, Integer, Integer, Integer, 
Integer>, Tuple5<Integer, Integer, Integer,
-                                       Integer, Integer>> {
-               private static final long serialVersionUID = 1L;
-               @Override
-               public Tuple5<Integer, Integer, Integer, Integer, Integer> 
map(Tuple5<Integer, Integer, Integer, Integer,
-                               Integer> value) throws Exception {
-                       return new Tuple5<>(value.f0, value.f1, value.f3, 
value.f2 + value.f3, ++value.f4);
-               }
-       }
-       /**
-        * OutputSelector testing which tuple needs to be iterated again.
-        */
-       public static class MySelector implements 
OutputSelector<Tuple5<Integer, Integer, Integer, Integer, Integer>> {
-               private static final long serialVersionUID = 1L;
-               @Override
-               public Iterable<String> select(Tuple5<Integer, Integer, 
Integer, Integer, Integer> value) {
-                       List<String> output = new ArrayList<>();
-                       if (value.f2 < BOUND && value.f3 < BOUND) {
-                               output.add("iterate");
-                       } else {
-                               output.add("output");
-                       }
-                       return output;
-               }
-       }
-       /**
-        * Giving back the input pair and the counter
-        */
-       public static class OutputMap implements MapFunction<Tuple5<Integer, 
Integer, Integer, Integer, Integer>,
-                       Tuple2<Tuple2<Integer, Integer>, Integer>> {
-               private static final long serialVersionUID = 1L;
-               @Override
-               public Tuple2<Tuple2<Integer, Integer>, Integer> 
map(Tuple5<Integer, Integer, Integer, Integer, Integer>
-                               value) throws
-                               Exception {
-                       return new Tuple2<>(new Tuple2<>(value.f0, value.f1), 
-               }
-       }
-       // 
-       // UTIL METHODS
-       // 
-       private static boolean fileInput = false;
-       private static boolean fileOutput = false;
-       private static String inputPath;
-       private static String outputPath;
-       private static boolean parseParameters(String[] args) {
-               if (args.length > 0) {
-                       // parse input arguments
-                       if (args.length == 1) {
-                               fileOutput = true;
-                               outputPath = args[0];
-                       } else if (args.length == 2) {
-                               fileInput = true;
-                               inputPath = args[0];
-                               fileOutput = true;
-                               outputPath = args[1];
-                       } else {
-                               System.err.println("Usage: IterateExample 
<result path>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing IterateExample with 
generated data.");
-                       System.out.println("  Provide parameter to write to 
-                       System.out.println("  Usage: IterateExample <result 
-               }
-               return true;
-       }
diff --git 
deleted file mode 100644
index 0077459..0000000
+++ /dev/null
@@ -1,32 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.examples.iteration.util;
-public class IterateExampleData {
-       public static final String INPUT_PAIRS = "(1,40)\n" + "(29,38)\n" + 
"(11,15)\n" + "(17,39)\n" + "(24,41)\n" +
-                       "(7,33)\n" + "(20,2)\n" + "(11,5)\n" + "(3,16)\n" + 
"(23,36)\n" + "(15,23)\n" + "(28,13)\n" + "(1,1)\n" +
-                       "(10,6)\n" + "(21,5)\n" + "(14,36)\n" + "(17,15)\n" + 
-       public static final String RESULTS = "((1,40),3)\n" + "((24,41),2)\n" + 
"((3,16),5)\n" + "((1,1),10)\n" +
-                       "((17,15),4)\n" + "((29,38),2)\n" + "((7,33),3)\n" + 
"((23,36),3)\n" + "((10,6),6)\n" + "((7,9),5)\n" +
-                       "((11,15),4)\n" + "((20,2),5)\n" + "((15,23),4)\n" + 
"((21,5),5)\n" +
-                       "((17,39),3)\n" + "((11,5),6)\n" + "((28,13),4)\n" + 
-       private IterateExampleData() {
-       }
diff --git 
deleted file mode 100644
index 3355f1c..0000000
+++ /dev/null
@@ -1,296 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.examples.join;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.TimestampExtractor;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
- * Example illustrating join over sliding windows of streams in Flink.
- *
- * <p>
- * This example will join two streams with a sliding window. One which emits 
grades and one which
- * emits salaries of people. The input format for both sources has an 
additional timestamp
- * as field 0. This is used to to event-time windowing. Time timestamps must be
- * monotonically increasing.
- *
- * This example shows how to:
- * <ul>
- *   <li>do windowed joins,
- *   <li>use tuple data types,
- *   <li>write a simple streaming program.
- * </ul>
- */
-public class WindowJoin {
-       // 
-       // PROGRAM
-       // 
-       public static void main(String[] args) throws Exception {
-               if (!parseParameters(args)) {
-                       return;
-               }
-               // obtain execution environment
-               StreamExecutionEnvironment env = 
-               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-               // connect to the data sources for grades and salaries
-               Tuple2<DataStream<Tuple3<Long, String, Integer>>, 
DataStream<Tuple3<Long, String, Integer>>> input = getInputStreams(env);
-               DataStream<Tuple3<Long, String, Integer>> grades = input.f0;
-               DataStream<Tuple3<Long, String, Integer>> salaries = input.f1;
-               // extract the timestamps
-               grades = grades.assignTimestamps(new MyTimestampExtractor());
-               salaries = salaries.assignTimestamps(new 
-               // apply a temporal join over the two stream based on the names 
over one
-               // second windows
-               DataStream<Tuple3<String, Integer, Integer>> joinedStream = 
-                               .join(salaries)
-                               .where(new NameKeySelector())
-                               .equalTo(new NameKeySelector())
-                               .window(TumblingTimeWindows.of(Time.of(5, 
-                               .apply(new MyJoinFunction());
-               // emit result
-               if (fileOutput) {
-                       joinedStream.writeAsText(outputPath, 1);
-               } else {
-                       joinedStream.print();
-               }
-               // execute program
-               env.execute("Windowed Join Example");
-       }
-       // 
-       // 
-       private final static String[] names = {"tom", "jerry", "alice", "bob", 
"john", "grace"};
-       private final static int GRADE_COUNT = 5;
-       private final static int SALARY_MAX = 10000;
-       private final static int SLEEP_TIME = 10;
-       /**
-        * Continuously emit tuples with random names and integers (grades).
-        */
-       public static class GradeSource implements SourceFunction<Tuple3<Long, 
String, Integer>> {
-               private static final long serialVersionUID = 1L;
-               private Random rand;
-               private Tuple3<Long, String, Integer> outTuple;
-               private volatile boolean isRunning = true;
-               private int counter;
-               public GradeSource() {
-                       rand = new Random();
-                       outTuple = new Tuple3<>();
-               }
-               @Override
-               public void run(SourceContext<Tuple3<Long, String, Integer>> 
ctx) throws Exception {
-                       while (isRunning && counter < 100) {
-                               outTuple.f0 = System.currentTimeMillis();
-                               outTuple.f1 = names[rand.nextInt(names.length)];
-                               outTuple.f2 = rand.nextInt(GRADE_COUNT) + 1;
-                               Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
-                               counter++;
-                               ctx.collect(outTuple);
-                       }
-               }
-               @Override
-               public void cancel() {
-                       isRunning = false;
-               }
-       }
-       /**
-        * Continuously emit tuples with random names and integers (salaries).
-        */
-       public static class SalarySource extends 
RichSourceFunction<Tuple3<Long, String, Integer>> {
-               private static final long serialVersionUID = 1L;
-               private transient Random rand;
-               private transient Tuple3<Long, String, Integer> outTuple;
-               private volatile boolean isRunning;
-               private int counter;
-               public void open(Configuration parameters) throws Exception {
-             ;
-                       rand = new Random();
-                       outTuple = new Tuple3<Long, String, Integer>();
-                       isRunning = true;
-               }
-               @Override
-               public void run(SourceContext<Tuple3<Long, String, Integer>> 
ctx) throws Exception {
-                       while (isRunning && counter < 100) {
-                               outTuple.f0 = System.currentTimeMillis();
-                               outTuple.f1 = names[rand.nextInt(names.length)];
-                               outTuple.f2 = rand.nextInt(SALARY_MAX) + 1;
-                               Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
-                               counter++;
-                               ctx.collect(outTuple);
-                       }
-               }
-               @Override
-               public void cancel() {
-                       isRunning = false;
-               }
-       }
-       public static class MySourceMap extends RichMapFunction<String, 
Tuple3<Long, String, Integer>> {
-               private static final long serialVersionUID = 1L;
-               private String[] record;
-               public MySourceMap() {
-                       record = new String[2];
-               }
-               @Override
-               public Tuple3<Long, String, Integer> map(String line) throws 
Exception {
-                       record = line.substring(1, line.length() - 
-                       return new Tuple3<>(Long.parseLong(record[0]), 
record[1], Integer.parseInt(record[2]));
-               }
-       }
-       public static class MyJoinFunction
-                       implements
-                       JoinFunction<Tuple3<Long, String, Integer>, 
Tuple3<Long, String, Integer>, Tuple3<String, Integer, Integer>> {
-               private static final long serialVersionUID = 1L;
-               private Tuple3<String, Integer, Integer> joined = new 
-               @Override
-               public Tuple3<String, Integer, Integer> join(Tuple3<Long, 
String, Integer> first,
-                               Tuple3<Long, String, Integer> second) throws 
Exception {
-                       joined.f0 = first.f1;
-                       joined.f1 = first.f2;
-                       joined.f2 = second.f2;
-                       return joined;
-               }
-       }
-       private static class MyTimestampExtractor implements 
TimestampExtractor<Tuple3<Long, String, Integer>> {
-               private static final long serialVersionUID = 1L;
-               @Override
-               public long extractTimestamp(Tuple3<Long, String, Integer> 
element, long currentTimestamp) {
-                       return element.f0;
-               }
-               @Override
-               public long extractWatermark(Tuple3<Long, String, Integer> 
element, long currentTimestamp) {
-                       return element.f0 - 1;
-               }
-               @Override
-               public long getCurrentWatermark() {
-                       return Long.MIN_VALUE;
-               }
-       }
-       private static class NameKeySelector implements 
KeySelector<Tuple3<Long, String, Integer>, String> {
-               private static final long serialVersionUID = 1L;
-               @Override
-               public String getKey(Tuple3<Long, String, Integer> value) 
throws Exception {
-                       return value.f1;
-               }
-       }
-       // 
-       // UTIL METHODS
-       // 
-       private static boolean fileInput = false;
-       private static boolean fileOutput = false;
-       private static String gradesPath;
-       private static String salariesPath;
-       private static String outputPath;
-       private static boolean parseParameters(String[] args) {
-               if (args.length > 0) {
-                       // parse input arguments
-                       if (args.length == 1) {
-                               fileOutput = true;
-                               outputPath = args[0];
-                       } else if (args.length == 3) {
-                               fileInput = true;
-                               fileOutput = true;
-                               gradesPath = args[0];
-                               salariesPath = args[1];
-                               outputPath = args[2];
-                       } else {
-                               System.err.println("Usage: WindowJoin <result 
path> or WindowJoin <input path 1> <input path 2> " +
-                                               "<result path>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing WindowJoin with generated 
-                       System.out.println("  Provide parameter to write to 
-                       System.out.println("  Usage: WindowJoin <result path>");
-               }
-               return true;
-       }
-       private static Tuple2<DataStream<Tuple3<Long, String, Integer>>, 
DataStream<Tuple3<Long, String, Integer>>> getInputStreams(
-                       StreamExecutionEnvironment env) {
-               DataStream<Tuple3<Long, String, Integer>> grades;
-               DataStream<Tuple3<Long, String, Integer>> salaries;
-               if (fileInput) {
-                       grades = env.readTextFile(gradesPath).map(new 
-                       salaries = env.readTextFile(salariesPath).map(new 
-               } else {
-                       grades = env.addSource(new GradeSource());
-                       salaries = env.addSource(new SalarySource());
-               }
-               return Tuple2.of(grades, salaries);
-       }
diff --git 
deleted file mode 100644
index 15c1280..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.examples.join.util;
-public class WindowJoinData {
-       public static final String GRADES_INPUT = "(0,john,5)\n" + 
"(0,tom,3)\n" + "(0,alice,1)\n" + "(0,grace,5)\n" +
-                       "(1,john,4)\n" + "(1,bob,1)\n" + "(1,alice,2)\n" + 
"(1,alice,3)\n" + "(1,bob,5)\n" + "(1,alice,3)\n" + "(1,tom,5)\n" +
-                       "(2,john,2)\n" + "(2,john,1)\n" + "(2,grace,2)\n" + 
"(2,jerry,2)\n" + "(2,tom,4)\n" + "(2,bob,4)\n" + "(2,bob,2)\n" +
-                       "(3, tom,2)\n" + "(3,alice,5)\n" + "(3,grace,5)\n" + 
"(3,grace,1)\n" + "(3,alice,1)\n" + "(3,grace,3)\n" + "(3,tom,1)\n" +
-                       "(4,jerry,5)\n" + "(4,john,3)\n" + "(4,john,4)\n" + 
"(4,john,1)\n" + "(4,jerry,3)\n" + "(4,grace,3)\n" + "(4,bob,3)\n" +
-                       "(5,john,3)\n" + "(5,jerry,4)\n" + "(5,tom,5)\n" + 
"(5,tom,4)\n" + "(5,john,2)\n" + "(5,jerry,1)\n" + "(5,bob,1)\n" +
-                       "(6,john,5)\n" + "(6,grace,4)\n" + "(6,tom,5)\n" + 
"(6,john,4)\n" + "(6,tom,1)\n" + "(6,grace,1)\n" + "(6,john,2)\n" +
-                       "(7,jerry,3)\n" + "(7,jerry,5)\n" + "(7,tom,2)\n" + 
"(7,tom,2)\n" + "(7,alice,4)\n" + "(7,tom,4)\n" + "(7,jerry,4)\n" +
-                       "(8,john,3)\n" + "(8,grace,4)\n" + "(8,tom,3)\n" + 
"(8,jerry,4)\n" + "(8,john,5)\n" + "(8,john,4)\n" + "(8,jerry,1)\n" +
-                       "(9,john,5)\n" + "(9,alice,2)\n" + "(9,tom,1)\n" + 
"(9,alice,5)\n" + "(9,grace,4)\n" + "(9,bob,4)\n" + "(9,jerry,1)\n" +
-                       "(10,john,5)\n" + "(10,tom,4)\n" + "(10,tom,5)\n" + 
"(10,jerry,5)\n" + "(10,tom,1)\n" + "(10,grace,3)\n" + "(10,bob,5)\n" +
-                       "(11,john,1)\n" + "(11,alice,1)\n" + "(11,grace,3)\n" + 
"(11,grace,1)\n" + "(11,jerry,1)\n" + "(11,jerry,4)\n" +
-                       "(12,bob,4)\n" + "(12,alice,3)\n" + "(12,tom,5)\n" + 
"(12,alice,4)\n" + "(12,alice,4)\n" + "(12,grace,4)\n" + "(12,john,5)\n" +
-                       "(13,john,5)\n" + "(13,grace,4)\n" + "(13,tom,4)\n" + 
"(13,john,4)\n" + "(13,john,5)\n" + "(13,alice,5)\n" + "(13,jerry,5)\n" +
-                       "(14,john,3)\n" + "(14,tom,5)\n" + "(14,jerry,4)\n" + 
"(14,grace,4)\n" + "(14,john,3)\n" + "(14,bob,2)";
-       public static final String SALARIES_INPUT = "(0,john,6469)\n" + 
"(0,jerry,6760)\n" + "(0,jerry,8069)\n" +
-                       "(1,tom,3662)\n" + "(1,grace,8427)\n" + 
"(1,john,9425)\n" + "(1,bob,9018)\n" + "(1,john,352)\n" + "(1,tom,3770)\n" +
-                       "(2,grace,7622)\n" + "(2,jerry,7441)\n" + 
"(2,alice,1468)\n" + "(2,bob,5472)\n" + "(2,grace,898)\n" +
-                       "(3,tom,3849)\n" + "(3,grace,1865)\n" + 
"(3,alice,5582)\n" + "(3,john,9511)\n" + "(3,alice,1541)\n" +
-                       "(4,john,2477)\n" + "(4,grace,3561)\n" + 
"(4,john,1670)\n" + "(4,grace,7290)\n" + "(4,grace,6565)\n" +
-                       "(5,tom,6179)\n" + "(5,tom,1601)\n" + "(5,john,2940)\n" 
+ "(5,bob,4685)\n" + "(5,bob,710)\n" + "(5,bob,5936)\n" +
-                       "(6,jerry,1412)\n" + "(6,grace,6515)\n" + 
"(6,grace,3321)\n" + "(6,tom,8088)\n" + "(6,john,2876)\n" +
-                       "(7,bob,9896)\n" + "(7,grace,7368)\n" + 
"(7,grace,9749)\n" + "(7,bob,2048)\n" + "(7,alice,4782)\n" +
-                       "(8,alice,3375)\n" + "(8,tom,5841)\n" + "(8,bob,958)\n" 
+ "(8,bob,5258)\n" + "(8,tom,3935)\n" + "(8,jerry,4394)\n" +
-                       "(9,alice,102)\n" + "(9,alice,4931)\n" + 
"(9,alice,5240)\n" + "(9,jerry,7951)\n" + "(9,john,5675)\n" +
-                       "(10,bob,609)\n" + "(10,alice,5997)\n" + 
"(10,jerry,9651)\n" + "(10,alice,1328)\n" + "(10,bob,1022)\n" +
-                       "(11,grace,2578)\n" + "(11,jerry,9704)\n" + 
"(11,tom,4476)\n" + "(11,grace,3784)\n" + "(11,alice,6144)\n" +
-                       "(12,bob,6213)\n" + "(12,alice,7525)\n" + 
"(12,jerry,2908)\n" + "(12,grace,8464)\n" + "(12,jerry,9920)\n" +
-                       "(13,bob,3720)\n" + "(13,bob,7612)\n" + 
"(13,alice,7211)\n" + "(13,jerry,6484)\n" + "(13,alice,1711)\n" +
-                       "(14,jerry,5994)\n" + "(14,grace,928)\n" + 
"(14,jerry,2492)\n" + "(14,grace,9080)\n" + "(14,tom,4330)\n" +
-                       "(15,bob,8302)\n" + "(15,john,4981)\n" + 
"(15,tom,1781)\n" + "(15,grace,1379)\n" + "(15,jerry,3700)\n" +
-                       "(16,jerry,3584)\n" + "(16,jerry,2038)\n" + 
"(16,jerry,3902)\n" + "(16,tom,1336)\n" + "(16,jerry,7500)\n" +
-                       "(17,tom,3648)\n" + "(17,alice,2533)\n" + 
"(17,tom,8685)\n" + "(17,bob,3968)\n" + "(17,tom,3241)\n" + "(17,bob,7461)\n" +
-                       "(18,jerry,2138)\n" + "(18,alice,7503)\n" + 
"(18,alice,6424)\n" + "(18,tom,140)\n" + "(18,john,9802)\n" +
-                       "(19,grace,2977)\n" + "(19,grace,889)\n" + 
-       private WindowJoinData() {
-       }
diff --git 
deleted file mode 100644
index 32cf430..0000000
+++ /dev/null
@@ -1,254 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.TimestampExtractor;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.util.Collector;
-import java.util.concurrent.TimeUnit;
- * Skeleton for incremental machine learning algorithm consisting of a
- * pre-computed model, which gets updated for the new inputs and new input data
- * for which the job provides predictions.
- *
- * <p>
- * This may serve as a base of a number of algorithms, e.g. updating an
- * incremental Alternating Least Squares model while also providing the
- * predictions.
- *
- * <p>
- * This example shows how to use:
- * <ul>
- *   <li>Connected streams
- *   <li>CoFunctions
- *   <li>Tuple data types
- * </ul>
- */
-public class IncrementalLearningSkeleton {
-       private static DataStream<Integer> trainingData = null;
-       private static DataStream<Integer> newData = null;
-       // 
-       // PROGRAM
-       // 
-       public static void main(String[] args) throws Exception {
-               if (!parseParameters(args)) {
-                       return;
-               }
-               StreamExecutionEnvironment env = 
-               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-               trainingData = env.addSource(new FiniteTrainingDataSource());
-               newData = env.addSource(new FiniteNewDataSource());
-               // build new model on every second of new data
-               DataStream<Double[]> model = trainingData
-                               .assignTimestamps(new LinearTimestamp())
-                               .timeWindowAll(Time.of(5000, 
-                               .apply(new PartialModelBuilder());
-               // use partial model for newData
-               DataStream<Integer> prediction = newData.connect(model).map(new 
-               // emit result
-               if (fileOutput) {
-                       prediction.writeAsText(outputPath, 1);
-               } else {
-                       prediction.print();
-               }
-               // execute program
-               env.execute("Streaming Incremental Learning");
-       }
-       // 
-       // 
-       /**
-        * Feeds new data for newData. By default it is implemented as 
-        * emitting the Integer 1 in a loop.
-        */
-       public static class FiniteNewDataSource implements 
SourceFunction<Integer> {
-               private static final long serialVersionUID = 1L;
-               private int counter;
-               @Override
-               public void run(SourceContext<Integer> ctx) throws Exception {
-                       Thread.sleep(15);
-                       while (counter < 50) {
-                               ctx.collect(getNewData());
-                       }
-               }
-               @Override
-               public void cancel() {
-                       // No cleanup needed
-               }
-               private Integer getNewData() throws InterruptedException {
-                       Thread.sleep(5);
-                       counter++;
-                       return 1;
-               }
-       }
-       /**
-        * Feeds new training data for the partial model builder. By default it 
-        * implemented as constantly emitting the Integer 1 in a loop.
-        */
-       public static class FiniteTrainingDataSource implements 
SourceFunction<Integer> {
-               private static final long serialVersionUID = 1L;
-               private int counter = 0;
-               @Override
-               public void run(SourceContext<Integer> collector) throws 
Exception {
-                       while (counter < 8200) {
-                               collector.collect(getTrainingData());
-                       }
-               }
-               @Override
-               public void cancel() {
-                       // No cleanup needed
-               }
-               private Integer getTrainingData() throws InterruptedException {
-                       counter++;
-                       return 1;
-               }
-       }
-       public static class LinearTimestamp implements 
TimestampExtractor<Integer> {
-               private static final long serialVersionUID = 1L;
-               private long counter = 0L;
-               @Override
-               public long extractTimestamp(Integer element, long 
currentTimestamp) {
-                       return counter += 10L;
-               }
-               @Override
-               public long extractWatermark(Integer element, long 
currentTimestamp) {
-                       return counter - 1;
-               }
-               @Override
-               public long getCurrentWatermark() {
-                       return Long.MIN_VALUE;
-               }
-       }
-       /**
-        * Builds up-to-date partial models on new training data.
-        */
-       public static class PartialModelBuilder implements 
AllWindowFunction<Integer, Double[], TimeWindow> {
-               private static final long serialVersionUID = 1L;
-               protected Double[] buildPartialModel(Iterable<Integer> values) {
-                       return new Double[]{1.};
-               }
-               @Override
-               public void apply(TimeWindow window, Iterable<Integer> values, 
Collector<Double[]> out) throws Exception {
-                       out.collect(buildPartialModel(values));
-               }
-       }
-       /**
-        * Creates newData using the model produced in batch-processing and the
-        * up-to-date partial model.
-        * <p>
-        * By defaults emits the Integer 0 for every newData and the Integer 1
-        * for every model update.
-        * </p>
-        */
-       public static class Predictor implements CoMapFunction<Integer, 
Double[], Integer> {
-               private static final long serialVersionUID = 1L;
-               Double[] batchModel = null;
-               Double[] partialModel = null;
-               @Override
-               public Integer map1(Integer value) {
-                       // Return newData
-                       return predict(value);
-               }
-               @Override
-               public Integer map2(Double[] value) {
-                       // Update model
-                       partialModel = value;
-                       batchModel = getBatchModel();
-                       return 1;
-               }
-               // pulls model built with batch-job on the old training data
-               protected Double[] getBatchModel() {
-                       return new Double[]{0.};
-               }
-               // performs newData using the two models
-               protected Integer predict(Integer inTuple) {
-                       return 0;
-               }
-       }
-       // 
-       // UTIL METHODS
-       // 
-       private static boolean fileOutput = false;
-       private static String outputPath;
-       private static boolean parseParameters(String[] args) {
-               if (args.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if (args.length == 1) {
-                               outputPath = args[0];
-                       } else {
-                               System.err.println("Usage: 
IncrementalLearningSkeleton <result path>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing 
IncrementalLearningSkeleton with generated data.");
-                       System.out.println("  Provide parameter to write to 
-                       System.out.println("  Usage: 
IncrementalLearningSkeleton <result path>");
-               }
-               return true;
-       }
diff --git 
deleted file mode 100644
index 8a6cd88..0000000
+++ /dev/null
@@ -1,32 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public class IncrementalLearningSkeletonData {
-       public static final String RESULTS = "1\n" + "1\n" + "1\n" + "1\n" + 
"1\n" + "1\n" + "1\n" + "1\n" +
-                       "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + 
"1\n" + "1\n" + "0\n" + "0\n" +
-                       "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + 
"0\n" + "0\n" + "0\n" + "0\n" +
-                       "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + 
"0\n" + "0\n" + "0\n" + "0\n" +
-                       "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + 
"0\n" + "0\n" + "0\n" + "0\n" +
-                       "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + 
"0\n" + "0\n" + "0\n" + "0\n" +
-                       "0\n" + "0\n" + "0\n" + "0\n";
-       private IncrementalLearningSkeletonData() {
-       }
diff --git 
deleted file mode 100644
index cecabdd..0000000
+++ /dev/null
@@ -1,105 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.examples.socket;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer;
- * This example shows an implementation of WordCount with data from a text
- * socket. To run the example make sure that the service providing the text 
- * is already up and running.
- * <p>
- * To start an example socket text stream on your local machine run netcat from
- * a command line: <code>nc -lk 9999</code>, where the parameter specifies the
- * port number.
- * </p>
- * <p>
- * Usage:
- * <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port&gt; &lt;result 
- * </p>
- * <p>
- * This example shows how to:
- * <ul>
- * <li>use StreamExecutionEnvironment.socketTextStream
- * <li>write a simple Flink program,
- * <li>write and use user-defined functions.
- * </ul>
- *
- * @see <a href="">netcat</a>
- */
-public class SocketTextStreamWordCount {
-       public static void main(String[] args) throws Exception {
-               if (!parseParameters(args)) {
-                       return;
-               }
-               // set up the execution environment
-               final StreamExecutionEnvironment env = 
-                               .getExecutionEnvironment();
-               // get input data
-               DataStream<String> text = env.socketTextStream(hostName, port, 
'\n', 0);
-               DataStream<Tuple2<String, Integer>> counts =
-                               // split up the lines in pairs (2-tuples) 
containing: (word,1)
-                               text.flatMap(new Tokenizer())
-                                               // group by the tuple field "0" 
and sum up tuple field "1"
-                                               .keyBy(0)
-                                               .sum(1);
-               if (fileOutput) {
-                       counts.writeAsText(outputPath, 1);
-               } else {
-                       counts.print();
-               }
-               // execute program
-               env.execute("WordCount from SocketTextStream Example");
-       }
-       // 
-       // UTIL METHODS
-       // 
-       private static boolean fileOutput = false;
-       private static String hostName;
-       private static int port;
-       private static String outputPath;
-       private static boolean parseParameters(String[] args) {
-               // parse input arguments
-               if (args.length == 3) {
-                       fileOutput = true;
-                       hostName = args[0];
-                       port = Integer.valueOf(args[1]);
-                       outputPath = args[2];
-               } else if (args.length == 2) {
-                       hostName = args[0];
-                       port = Integer.valueOf(args[1]);
-               } else {
-                       System.err.println("Usage: SocketTextStreamWordCount 
<hostname> <port> [<output path>]");
-                       return false;
-               }
-               return true;
-       }

Reply via email to