http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala deleted file mode 100644 index 1b1d5c5..0000000 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.examples.scala.relational - -import org.apache.flink.api.scala._ -import org.apache.flink.util.Collector - -import org.apache.flink.api.java.aggregation.Aggregations - -/** - * This program implements a modified version of the TPC-H query 10. - * - * The original query can be found at - * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf) - * (page 45). - * - * This program implements the following SQL equivalent: - * - * {{{ - * SELECT - * c_custkey, - * c_name, - * c_address, - * n_name, - * c_acctbal - * SUM(l_extendedprice * (1 - l_discount)) AS revenue, - * FROM - * customer, - * orders, - * lineitem, - * nation - * WHERE - * c_custkey = o_custkey - * AND l_orderkey = o_orderkey - * AND YEAR(o_orderdate) > '1990' - * AND l_returnflag = 'R' - * AND c_nationkey = n_nationkey - * GROUP BY - * c_custkey, - * c_name, - * c_acctbal, - * n_name, - * c_address - * }}} - * - * Compared to the original TPC-H query this version does not print - * c_phone and c_comment, only filters by years greater than 1990 instead of - * a period of 3 months, and does not sort the result by revenue.. - * - * Input files are plain text CSV files using the pipe character ('|') as field separator - * as generated by the TPC-H data generator which is available at - * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/). - * - * Usage: - * {{{ - *TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation path> <result path> - * }}} - * - * This example shows how to use: - * - tuple data types - * - build-in aggregation functions - * - join with size hints - * - */ -object TPCHQuery10 { - - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - // get execution environment - val env = ExecutionEnvironment.getExecutionEnvironment - - // get customer data set: (custkey, name, address, nationkey, acctbal) - val customers = getCustomerDataSet(env) - // get orders data set: (orderkey, custkey, orderdate) - val orders = getOrdersDataSet(env) - // get lineitem data set: (orderkey, extendedprice, discount, returnflag) - val lineitems = getLineitemDataSet(env) - // get nation data set: (nationkey, name) - val nations = getNationDataSet(env) - - // filter orders by years - val orders1990 = orders.filter( o => o._3.substring(0,4).toInt > 1990) - .map( o => (o._1, o._2)) - - // filter lineitems by return status - val lineitemsReturn = lineitems.filter( l => l._4.equals("R")) - .map( l => (l._1, l._2 * (1 - l._3)) ) - - // compute revenue by customer - val revenueByCustomer = orders1990.joinWithHuge(lineitemsReturn).where(0).equalTo(0) - .apply( (o,l) => (o._2, l._2) ) - .groupBy(0) - .aggregate(Aggregations.SUM, 1) - - // compute final result by joining customer and nation information with revenue - val result = customers.joinWithTiny(nations).where(3).equalTo(0) - .apply( (c, n) => (c._1, c._2, c._3, n._2, c._5) ) - .join(revenueByCustomer).where(0).equalTo(0) - .apply( (c, r) => (c._1, c._2, c._3, c._4, c._5, r._2) ) - // emit result - result.writeAsCsv(outputPath, "\n", "|") - - // execute program - env.execute("Scala TPCH Query 10 Example") - } - - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private var customerPath: String = null - private var ordersPath: String = null - private var lineitemPath: String = null - private var nationPath: String = null - private var outputPath: String = null - - private def parseParameters(args: Array[String]): Boolean = { - if (args.length == 5) { - customerPath = args(0) - ordersPath = args(1) - lineitemPath = args(2) - nationPath = args(3) - outputPath = args(4) - true - } else { - System.err.println("This program expects data from the TPC-H benchmark as input data.\n" + - " Due to legal restrictions, we can not ship generated data.\n" + - " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + - " Usage: TPCHQuery10 <customer-csv path> <orders-csv path> " + - "<lineitem-csv path> <nation-csv path> <result path>") - false - } - } - - private def getCustomerDataSet(env: ExecutionEnvironment): - DataSet[Tuple5[Int, String, String, Int, Double]] = { - env.readCsvFile[Tuple5[Int, String, String, Int, Double]]( - customerPath, - fieldDelimiter = "|", - includedFields = Array(0,1,2,3,5) ) - } - - private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Tuple3[Int, Int, String]] = { - env.readCsvFile[Tuple3[Int, Int, String]]( - ordersPath, - fieldDelimiter = "|", - includedFields = Array(0, 1, 4) ) - } - - private def getLineitemDataSet(env: ExecutionEnvironment): - DataSet[Tuple4[Int, Double, Double, String]] = { - env.readCsvFile[Tuple4[Int, Double, Double, String]]( - lineitemPath, - fieldDelimiter = "|", - includedFields = Array(0, 5, 6, 8) ) - } - - private def getNationDataSet(env: ExecutionEnvironment): DataSet[Tuple2[Int, String]] = { - env.readCsvFile[Tuple2[Int, String]]( - nationPath, - fieldDelimiter = "|", - includedFields = Array(0, 1) ) - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala deleted file mode 100644 index 0e0ecd3..0000000 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.examples.scala.relational - -import org.apache.flink.api.scala._ -import org.apache.flink.core.fs.FileSystem.WriteMode - -import org.apache.flink.api.java.aggregation.Aggregations - -/** - * This program implements a modified version of the TPC-H query 3. The - * example demonstrates how to assign names to fields by extending the Tuple class. - * The original query can be found at - * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf) - * (page 29). - * - * This program implements the following SQL equivalent: - * - * {{{ - * SELECT - * l_orderkey, - * SUM(l_extendedprice*(1-l_discount)) AS revenue, - * o_orderdate, - * o_shippriority - * FROM customer, - * orders, - * lineitem - * WHERE - * c_mktsegment = '[SEGMENT]' - * AND c_custkey = o_custkey - * AND l_orderkey = o_orderkey - * AND o_orderdate < date '[DATE]' - * AND l_shipdate > date '[DATE]' - * GROUP BY - * l_orderkey, - * o_orderdate, - * o_shippriority; - * }}} - * - * Compared to the original TPC-H query this version does not sort the result by revenue - * and orderdate. - * - * Input files are plain text CSV files using the pipe character ('|') as field separator - * as generated by the TPC-H data generator which is available at - * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/). - * - * Usage: - * {{{ - * TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result path> - * }}} - * - * This example shows how to use: - * - case classes and case class field addressing - * - build-in aggregation functions - * - */ -object TPCHQuery3 { - - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - // set filter date - val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd") - val date = dateFormat.parse("1995-03-12") - - // get execution environment - val env = ExecutionEnvironment.getExecutionEnvironment - - // read and filter lineitems by shipDate - val lineitems = getLineitemDataSet(env).filter( l => dateFormat.parse(l.shipDate).after(date) ) - // read and filter customers by market segment - val customers = getCustomerDataSet(env).filter( c => c.mktSegment.equals("AUTOMOBILE")) - // read orders - val orders = getOrdersDataSet(env) - - // filter orders by order date - val items = orders.filter( o => dateFormat.parse(o.orderDate).before(date) ) - // filter orders by joining with customers - .join(customers).where("custId").equalTo("custId").apply( (o,c) => o ) - // join with lineitems - .join(lineitems).where("orderId").equalTo("orderId") - .apply( (o,l) => - new ShippedItem( o.orderId, - l.extdPrice * (1.0 - l.discount), - o.orderDate, - o.shipPrio ) ) - - // group by order and aggregate revenue - val result = items.groupBy("orderId", "orderDate", "shipPrio") - .aggregate(Aggregations.SUM, "revenue") - - // emit result - result.writeAsCsv(outputPath, "\n", "|") - - // execute program - env.execute("Scala TPCH Query 3 Example") - } - - // ************************************************************************* - // USER DATA TYPES - // ************************************************************************* - - case class Lineitem(orderId: Long, extdPrice: Double, discount: Double, shipDate: String) - case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long) - case class Customer(custId: Long, mktSegment: String) - case class ShippedItem(orderId: Long, revenue: Double, orderDate: String, shipPrio: Long) - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private var lineitemPath: String = null - private var customerPath: String = null - private var ordersPath: String = null - private var outputPath: String = null - - private def parseParameters(args: Array[String]): Boolean = { - if (args.length == 4) { - lineitemPath = args(0) - customerPath = args(1) - ordersPath = args(2) - outputPath = args(3) - true - } else { - System.err.println("This program expects data from the TPC-H benchmark as input data.\n" + - " Due to legal restrictions, we can not ship generated data.\n" + - " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + - " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" + - "<orders-csv path> <result path>") - false - } - } - - private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = { - env.readCsvFile[Lineitem]( - lineitemPath, - fieldDelimiter = "|", - includedFields = Array(0, 5, 6, 10) ) - } - - private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = { - env.readCsvFile[Customer]( - customerPath, - fieldDelimiter = "|", - includedFields = Array(0, 6) ) - } - - private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = { - env.readCsvFile[Order]( - ordersPath, - fieldDelimiter = "|", - includedFields = Array(0, 1, 4, 7) ) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala deleted file mode 100644 index 5392594..0000000 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.examples.scala.relational - -import org.apache.flink.api.scala._ -import org.apache.flink.examples.java.relational.util.WebLogData -import org.apache.flink.util.Collector - -/** - * This program processes web logs and relational data. - * It implements the following relational query: - * - * {{{ - * SELECT - * r.pageURL, - * r.pageRank, - * r.avgDuration - * FROM documents d JOIN rankings r - * ON d.url = r.url - * WHERE CONTAINS(d.text, [keywords]) - * AND r.rank > [rank] - * AND NOT EXISTS - * ( - * SELECT * FROM Visits v - * WHERE v.destUrl = d.url - * AND v.visitDate < [date] - * ); - * }}} - * - * - * Input files are plain text CSV files using the pipe character ('|') as field separator. - * The tables referenced in the query can be generated using the - * [org.apache.flink.examples.java.relational.util.WebLogDataGenerator]] and - * have the following schemas - * - * {{{ - * CREATE TABLE Documents ( - * url VARCHAR(100) PRIMARY KEY, - * contents TEXT ); - * - * CREATE TABLE Rankings ( - * pageRank INT, - * pageURL VARCHAR(100) PRIMARY KEY, - * avgDuration INT ); - * - * CREATE TABLE Visits ( - * sourceIP VARCHAR(16), - * destURL VARCHAR(100), - * visitDate DATE, - * adRevenue FLOAT, - * userAgent VARCHAR(64), - * countryCode VARCHAR(3), - * languageCode VARCHAR(6), - * searchWord VARCHAR(32), - * duration INT ); - * }}} - * - * - * Usage - * {{{ - * WebLogAnalysis <documents path> <ranks path> <visits path> <result path> - * }}} - * - * If no parameters are provided, the program is run with default data from - * [[org.apache.flink.examples.java.relational.util.WebLogData]]. - * - * This example shows how to use: - * - * - tuple data types - * - projection and join projection - * - the CoGroup transformation for an anti-join - * - */ -object WebLogAnalysis { - - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - val env = ExecutionEnvironment.getExecutionEnvironment - - val documents = getDocumentsDataSet(env) - val ranks = getRanksDataSet(env) - val visits = getVisitsDataSet(env) - - val filteredDocs = documents - .filter(doc => doc._2.contains(" editors ") && doc._2.contains(" oscillations ")) - - val filteredRanks = ranks - .filter(rank => rank._1 > 40) - - val filteredVisits = visits - .filter(visit => visit._2.substring(0, 4).toInt == 2007) - - val joinDocsRanks = filteredDocs.join(filteredRanks).where(0).equalTo(1) { - (doc, rank) => rank - }.withForwardedFieldsSecond("*") - - val result = joinDocsRanks.coGroup(filteredVisits).where(1).equalTo(0) { - (ranks, visits, out: Collector[(Int, String, Int)]) => - if (visits.isEmpty) for (rank <- ranks) out.collect(rank) - }.withForwardedFieldsFirst("*") - - - - - // emit result - if (fileOutput) { - result.writeAsCsv(outputPath, "\n", "|") - env.execute("Scala WebLogAnalysis Example") - } else { - result.print() - } - - } - - private var fileOutput: Boolean = false - private var documentsPath: String = null - private var ranksPath: String = null - private var visitsPath: String = null - private var outputPath: String = null - - private def parseParameters(args: Array[String]): Boolean = { - if (args.length > 0) { - fileOutput = true - if (args.length == 4) { - documentsPath = args(0) - ranksPath = args(1) - visitsPath = args(2) - outputPath = args(3) - } - else { - System.err.println("Usage: WebLogAnalysis <documents path> <ranks path> <visits path> " + - "<result path>") - return false - } - } - else { - System.out.println("Executing WebLog Analysis example with built-in default data.") - System.out.println(" Provide parameters to read input data from files.") - System.out.println(" See the documentation for the correct format of input files.") - System.out.println(" We provide a data generator to create synthetic input files for this " + - "program.") - System.out.println(" Usage: WebLogAnalysis <documents path> <ranks path> <visits path> " + - "<result path>") - } - true - } - - private def getDocumentsDataSet(env: ExecutionEnvironment): DataSet[(String, String)] = { - if (fileOutput) { - env.readCsvFile[(String, String)]( - documentsPath, - fieldDelimiter = "|", - includedFields = Array(0, 1)) - } - else { - val documents = WebLogData.DOCUMENTS map { - case Array(x, y) => (x.asInstanceOf[String], y.asInstanceOf[String]) - } - env.fromCollection(documents) - } - } - - private def getRanksDataSet(env: ExecutionEnvironment): DataSet[(Int, String, Int)] = { - if (fileOutput) { - env.readCsvFile[(Int, String, Int)]( - ranksPath, - fieldDelimiter = "|", - includedFields = Array(0, 1, 2)) - } - else { - val ranks = WebLogData.RANKS map { - case Array(x, y, z) => (x.asInstanceOf[Int], y.asInstanceOf[String], z.asInstanceOf[Int]) - } - env.fromCollection(ranks) - } - } - - private def getVisitsDataSet(env: ExecutionEnvironment): DataSet[(String, String)] = { - if (fileOutput) { - env.readCsvFile[(String, String)]( - visitsPath, - fieldDelimiter = "|", - includedFields = Array(1, 2)) - } - else { - val visits = WebLogData.VISITS map { - case Array(x, y) => (x.asInstanceOf[String], y.asInstanceOf[String]) - } - env.fromCollection(visits) - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala deleted file mode 100644 index 7d5db7e..0000000 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.examples.scala.wordcount - -import org.apache.flink.api.scala._ -import org.apache.flink.examples.java.wordcount.util.WordCountData - -/** - * Implements the "WordCount" program that computes a simple word occurrence histogram - * over text files. - * - * The input is a plain text file with lines separated by newline characters. - * - * Usage: - * {{{ - * WordCount <text path> <result path>> - * }}} - * - * If no parameters are provided, the program is run with default data from - * [[org.apache.flink.examples.java.wordcount.util.WordCountData]] - * - * This example shows how to: - * - * - write a simple Flink program. - * - use Tuple data types. - * - write and use user-defined functions. - * - */ -object WordCount { - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - val env = ExecutionEnvironment.getExecutionEnvironment - val text = getTextDataSet(env) - - val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } - .map { (_, 1) } - .groupBy(0) - .sum(1) - - if (fileOutput) { - counts.writeAsCsv(outputPath, "\n", " ") - env.execute("Scala WordCount Example") - } else { - counts.print() - } - - } - - private def parseParameters(args: Array[String]): Boolean = { - if (args.length > 0) { - fileOutput = true - if (args.length == 2) { - textPath = args(0) - outputPath = args(1) - true - } else { - System.err.println("Usage: WordCount <text path> <result path>") - false - } - } else { - System.out.println("Executing WordCount example with built-in default data.") - System.out.println(" Provide parameters to read input data from a file.") - System.out.println(" Usage: WordCount <text path> <result path>") - true - } - } - - private def getTextDataSet(env: ExecutionEnvironment): DataSet[String] = { - if (fileOutput) { - env.readTextFile(textPath) - } - else { - env.fromCollection(WordCountData.WORDS) - } - } - - private var fileOutput: Boolean = false - private var textPath: String = null - private var outputPath: String = null -} - - http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/pom.xml ---------------------------------------------------------------------- diff --git a/flink-examples/pom.xml b/flink-examples/pom.xml index e4376f4..ace7ec2 100644 --- a/flink-examples/pom.xml +++ b/flink-examples/pom.xml @@ -47,7 +47,7 @@ under the License. </dependencies> <modules> - <module>flink-java-examples</module> - <module>flink-scala-examples</module> + <module>flink-examples-batch</module> + <module>flink-examples-streaming</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-staging/flink-fs-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-fs-tests/pom.xml b/flink-staging/flink-fs-tests/pom.xml index 9492d55..69c5f30 100644 --- a/flink-staging/flink-fs-tests/pom.xml +++ b/flink-staging/flink-fs-tests/pom.xml @@ -59,7 +59,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-java-examples</artifactId> + <artifactId>flink-examples-batch</artifactId> <version>${project.version}</version> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-staging/flink-table/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/pom.xml b/flink-staging/flink-table/pom.xml index bdd1b58..f13f422 100644 --- a/flink-staging/flink-table/pom.xml +++ b/flink-staging/flink-table/pom.xml @@ -54,7 +54,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-scala-examples</artifactId> + <artifactId>flink-examples-batch</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-staging/flink-tez/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tez/pom.xml b/flink-staging/flink-tez/pom.xml index 0062349..6083f7f 100644 --- a/flink-staging/flink-tez/pom.xml +++ b/flink-staging/flink-tez/pom.xml @@ -85,7 +85,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-java-examples</artifactId> + <artifactId>flink-examples-batch</artifactId> <version>${project.version}</version> <exclusions> <exclusion> http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-streaming-examples/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-examples/pom.xml b/flink-streaming-examples/pom.xml deleted file mode 100644 index c935ac1..0000000 --- a/flink-streaming-examples/pom.xml +++ /dev/null @@ -1,535 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-parent</artifactId> - <version>1.0-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-streaming-examples</artifactId> - <name>flink-streaming-examples</name> - - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-scala</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java-examples</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-twitter</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-tests</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - </dependencies> - - <build> - <plugins> - <!-- get default data from flink-java-examples package --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <version>2.9</version><!--$NO-MVN-MAN-VER$--> - <executions> - <execution> - <id>unpack</id> - <phase>prepare-package</phase> - <goals> - <goal>unpack</goal> - </goals> - <configuration> - <artifactItems> - <!-- For WordCount example data --> - <artifactItem> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java-examples</artifactId> - <version>${project.version}</version> - <type>jar</type> - <overWrite>false</overWrite> - <outputDirectory>${project.build.directory}/classes</outputDirectory> - <includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes> - </artifactItem> - <!-- For JSON utilities --> - <artifactItem> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-twitter</artifactId> - <version>${project.version}</version> - <type>jar</type> - <overWrite>false</overWrite> - <outputDirectory>${project.build.directory}/classes</outputDirectory> - <includes>org/apache/flink/streaming/connectors/json/*</includes> - </artifactItem> - </artifactItems> - </configuration> - </execution> - </executions> - </plugin> - - <!-- self-contained jars for each example --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <version>2.4</version><!--$NO-MVN-MAN-VER$--> - <executions> - <!-- Default Execution --> - <execution> - <id>default</id> - <phase>package</phase> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - - <!-- Iteration --> - <execution> - <id>Iteration</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>Iteration</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.iteration.IterateExample</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/iteration/*.class</include> - </includes> - </configuration> - </execution> - - <!-- IncrementalLearning --> - <execution> - <id>IncrementalLearning</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>IncrementalLearning</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/ml/*.class</include> - </includes> - </configuration> - </execution> - - <!-- Twitter --> - <execution> - <id>Twitter</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>Twitter</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.twitter.TwitterStream</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/twitter/*.class</include> - <include>org/apache/flink/streaming/examples/twitter/util/*.class</include> - <include>org/apache/flink/streaming/connectors/json/*.class</include> - </includes> - </configuration> - </execution> - - <!-- WindowJoin --> - <execution> - <id>WindowJoin</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>WindowJoin</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.join.WindowJoin</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/join/*.class</include> - </includes> - </configuration> - </execution> - - <!-- WordCountPOJO --> - <execution> - <id>WordCountPOJO</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>WordCountPOJO</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.wordcount.PojoExample</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/wordcount/PojoExample.class</include> - <include>org/apache/flink/streaming/examples/wordcount/PojoExample$*.class</include> - <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include> - </includes> - </configuration> - </execution> - - <!-- WordCount --> - <execution> - <id>WordCount</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>WordCount</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.wordcount.WordCount</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include> - <include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include> - <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include> - </includes> - </configuration> - </execution> - - <!-- WindowWordCount --> - <execution> - <id>WindowWordCount</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>WindowWordCount</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.windowing.WindowWordCount</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/windowing/WindowWordCount.class</include> - <include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include> - <include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include> - <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include> - </includes> - </configuration> - </execution> - - <!-- SocketTextStreamWordCount --> - <execution> - <id>SocketTextStreamWordCount</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>SocketTextStreamWordCount</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.class</include> - <include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include> - <include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include> - </includes> - </configuration> - </execution> - - <!-- TopSpeedWindowing --> - <execution> - <id>TopSpeedWindowing</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>TopSpeedWindowing</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.windowing.TopSpeedWindowing</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.class</include> - <include>org/apache/flink/streaming/examples/windowing/TopSpeedWindowing$*.class</include> - </includes> - </configuration> - </execution> - - <!-- SessionWindowing --> - <execution> - <id>SessionWindowing</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>SessionWindowing</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.windowing.SessionWindowing</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/windowing/SessionWindowing.class</include> - <include>org/apache/flink/streaming/examples/windowing/SessionWindowing$*.class</include> - </includes> - </configuration> - </execution> - - </executions> - </plugin> - - - <!-- Scala Compiler --> - <plugin> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - <version>3.1.4</version> - <executions> - <!-- Run scala compiler in the process-resources phase, so that dependencies on - scala classes can be resolved later in the (Java) compile phase --> - <execution> - <id>scala-compile-first</id> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - - <!-- Run scala compiler in the process-test-resources phase, so that dependencies on - scala classes can be resolved later in the (Java) test-compile phase --> - <execution> - <id>scala-test-compile</id> - <phase>process-test-resources</phase> - <goals> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - <configuration> - <jvmArgs> - <jvmArg>-Xms128m</jvmArg> - <jvmArg>-Xmx512m</jvmArg> - </jvmArgs> - </configuration> - </plugin> - - <!-- Eclipse Integration --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.8</version> - <configuration> - <downloadSources>true</downloadSources> - <projectnatures> - <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> - <projectnature>org.eclipse.jdt.core.javanature</projectnature> - </projectnatures> - <buildcommands> - <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> - </buildcommands> - <classpathContainers> - <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> - <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> - </classpathContainers> - <excludes> - <exclude>org.scala-lang:scala-library</exclude> - <exclude>org.scala-lang:scala-compiler</exclude> - </excludes> - <sourceIncludes> - <sourceInclude>**/*.scala</sourceInclude> - <sourceInclude>**/*.java</sourceInclude> - </sourceIncludes> - </configuration> - </plugin> - - <!-- Adding scala source directories to build path --> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.7</version> - <executions> - <!-- Add src/main/scala to eclipse build path --> - <execution> - <id>add-source</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>src/main/scala</source> - </sources> - </configuration> - </execution> - <!-- Add src/test/scala to eclipse build path --> - <execution> - <id>add-test-source</id> - <phase>generate-test-sources</phase> - <goals> - <goal>add-test-source</goal> - </goals> - <configuration> - <sources> - <source>src/test/scala</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.scalastyle</groupId> - <artifactId>scalastyle-maven-plugin</artifactId> - <version>0.5.0</version> - <executions> - <execution> - <goals> - <goal>check</goal> - </goals> - </execution> - </executions> - <configuration> - <verbose>false</verbose> - <failOnViolation>true</failOnViolation> - <includeTestSourceDirectory>true</includeTestSourceDirectory> - <failOnWarning>false</failOnWarning> - <sourceDirectory>${basedir}/src/main/scala</sourceDirectory> - <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory> - <configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation> - <outputFile>${project.basedir}/scalastyle-output.xml</outputFile> - <outputEncoding>UTF-8</outputEncoding> - </configuration> - </plugin> - </plugins> - - <pluginManagement> - <plugins> - <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.--> - <plugin> - <groupId>org.eclipse.m2e</groupId> - <artifactId>lifecycle-mapping</artifactId> - <version>1.0.0</version> - <configuration> - <lifecycleMappingMetadata> - <pluginExecutions> - <pluginExecution> - <pluginExecutionFilter> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <versionRange>[2.9,)</versionRange> - <goals> - <goal>unpack</goal> - </goals> - </pluginExecutionFilter> - <action> - <ignore/> - </action> - </pluginExecution> - </pluginExecutions> - </lifecycleMappingMetadata> - </configuration> - </plugin> - </plugins> - </pluginManagement> - - </build> - -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java ---------------------------------------------------------------------- diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java deleted file mode 100644 index b6e1a61..0000000 --- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.iteration; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.streaming.api.collector.selector.OutputSelector; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.IterativeStream; -import org.apache.flink.streaming.api.datastream.SplitStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; - -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -/** - * Example illustrating iterations in Flink streaming. - * <p> The program sums up random numbers and counts additions - * it performs to reach a specific threshold in an iterative streaming fashion. </p> - * - * <p> - * This example shows how to use: <ul> <li>streaming iterations, <li>buffer timeout to enhance latency, <li>directed - * outputs. </ul> - */ -public class IterateExample { - - private static final int BOUND = 100; - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - // set up input for the stream of integer pairs - - // obtain execution environment and set setBufferTimeout to 1 to enable - // continuous flushing of the output buffers (lowest latency) - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() - .setBufferTimeout(1); - - // create input stream of integer pairs - DataStream<Tuple2<Integer, Integer>> inputStream; - if (fileInput) { - inputStream = env.readTextFile(inputPath).map(new FibonacciInputMap()); - } else { - inputStream = env.addSource(new RandomFibonacciSource()); - } - - // create an iterative data stream from the input with 5 second timeout - IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> it = inputStream.map(new InputMap()) - .iterate(5000); - - // apply the step function to get the next Fibonacci number - // increment the counter and split the output with the output selector - SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> step = it.map(new Step()) - .split(new MySelector()); - - // close the iteration by selecting the tuples that were directed to the - // 'iterate' channel in the output selector - it.closeWith(step.select("iterate")); - - // to produce the final output select the tuples directed to the - // 'output' channel then get the input pairs that have the greatest iteration counter - // on a 1 second sliding window - DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers = step.select("output") - .map(new OutputMap()); - - // emit results - if (fileOutput) { - numbers.writeAsText(outputPath, 1); - } else { - numbers.print(); - } - - // execute the program - env.execute("Streaming Iteration Example"); - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** - * Generate BOUND number of random integer pairs from the range from 0 to BOUND/2 - */ - private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer, Integer>> { - private static final long serialVersionUID = 1L; - - private Random rnd = new Random(); - - private volatile boolean isRunning = true; - private int counter = 0; - - @Override - public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { - - while (isRunning && counter < BOUND) { - int first = rnd.nextInt(BOUND / 2 - 1) + 1; - int second = rnd.nextInt(BOUND / 2 - 1) + 1; - - ctx.collect(new Tuple2<>(first, second)); - counter++; - Thread.sleep(50L); - } - } - - @Override - public void cancel() { - isRunning = false; - } - } - - /** - * Generate random integer pairs from the range from 0 to BOUND/2 - */ - private static class FibonacciInputMap implements MapFunction<String, Tuple2<Integer, Integer>> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<Integer, Integer> map(String value) throws Exception { - String record = value.substring(1, value.length() - 1); - String[] splitted = record.split(","); - return new Tuple2<>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1])); - } - } - - /** - * Map the inputs so that the next Fibonacci numbers can be calculated while preserving the original input tuple A - * counter is attached to the tuple and incremented in every iteration step - */ - public static class InputMap implements MapFunction<Tuple2<Integer, Integer>, Tuple5<Integer, Integer, Integer, - Integer, Integer>> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple2<Integer, Integer> value) throws - Exception { - return new Tuple5<>(value.f0, value.f1, value.f0, value.f1, 0); - } - } - - /** - * Iteration step function that calculates the next Fibonacci number - */ - public static class Step implements - MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer, - Integer, Integer>> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer, - Integer> value) throws Exception { - return new Tuple5<>(value.f0, value.f1, value.f3, value.f2 + value.f3, ++value.f4); - } - } - - /** - * OutputSelector testing which tuple needs to be iterated again. - */ - public static class MySelector implements OutputSelector<Tuple5<Integer, Integer, Integer, Integer, Integer>> { - private static final long serialVersionUID = 1L; - - @Override - public Iterable<String> select(Tuple5<Integer, Integer, Integer, Integer, Integer> value) { - List<String> output = new ArrayList<>(); - if (value.f2 < BOUND && value.f3 < BOUND) { - output.add("iterate"); - } else { - output.add("output"); - } - return output; - } - } - - /** - * Giving back the input pair and the counter - */ - public static class OutputMap implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, - Tuple2<Tuple2<Integer, Integer>, Integer>> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer> - value) throws - Exception { - return new Tuple2<>(new Tuple2<>(value.f0, value.f1), value.f4); - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileInput = false; - private static boolean fileOutput = false; - private static String inputPath; - private static String outputPath; - - private static boolean parseParameters(String[] args) { - - if (args.length > 0) { - // parse input arguments - if (args.length == 1) { - fileOutput = true; - outputPath = args[0]; - } else if (args.length == 2) { - fileInput = true; - inputPath = args[0]; - fileOutput = true; - outputPath = args[1]; - } else { - System.err.println("Usage: IterateExample <result path>"); - return false; - } - } else { - System.out.println("Executing IterateExample with generated data."); - System.out.println(" Provide parameter to write to file."); - System.out.println(" Usage: IterateExample <result path>"); - } - return true; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java ---------------------------------------------------------------------- diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java deleted file mode 100644 index 0077459..0000000 --- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.iteration.util; - -public class IterateExampleData { - public static final String INPUT_PAIRS = "(1,40)\n" + "(29,38)\n" + "(11,15)\n" + "(17,39)\n" + "(24,41)\n" + - "(7,33)\n" + "(20,2)\n" + "(11,5)\n" + "(3,16)\n" + "(23,36)\n" + "(15,23)\n" + "(28,13)\n" + "(1,1)\n" + - "(10,6)\n" + "(21,5)\n" + "(14,36)\n" + "(17,15)\n" + "(7,9)"; - - public static final String RESULTS = "((1,40),3)\n" + "((24,41),2)\n" + "((3,16),5)\n" + "((1,1),10)\n" + - "((17,15),4)\n" + "((29,38),2)\n" + "((7,33),3)\n" + "((23,36),3)\n" + "((10,6),6)\n" + "((7,9),5)\n" + - "((11,15),4)\n" + "((20,2),5)\n" + "((15,23),4)\n" + "((21,5),5)\n" + - "((17,39),3)\n" + "((11,5),6)\n" + "((28,13),4)\n" + "((14,36),3)"; - - private IterateExampleData() { - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java ---------------------------------------------------------------------- diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java deleted file mode 100644 index 3355f1c..0000000 --- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.join; - -import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.TimestampExtractor; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; -import org.apache.flink.streaming.api.windowing.time.Time; - -import java.util.Random; -import java.util.concurrent.TimeUnit; - -/** - * Example illustrating join over sliding windows of streams in Flink. - * - * <p> - * This example will join two streams with a sliding window. One which emits grades and one which - * emits salaries of people. The input format for both sources has an additional timestamp - * as field 0. This is used to to event-time windowing. Time timestamps must be - * monotonically increasing. - * - * This example shows how to: - * <ul> - * <li>do windowed joins, - * <li>use tuple data types, - * <li>write a simple streaming program. - * </ul> - */ -public class WindowJoin { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - // obtain execution environment - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - - // connect to the data sources for grades and salaries - Tuple2<DataStream<Tuple3<Long, String, Integer>>, DataStream<Tuple3<Long, String, Integer>>> input = getInputStreams(env); - DataStream<Tuple3<Long, String, Integer>> grades = input.f0; - DataStream<Tuple3<Long, String, Integer>> salaries = input.f1; - - // extract the timestamps - grades = grades.assignTimestamps(new MyTimestampExtractor()); - salaries = salaries.assignTimestamps(new MyTimestampExtractor()); - - // apply a temporal join over the two stream based on the names over one - // second windows - DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades - .join(salaries) - .where(new NameKeySelector()) - .equalTo(new NameKeySelector()) - .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.MILLISECONDS))) - .apply(new MyJoinFunction()); - - // emit result - if (fileOutput) { - joinedStream.writeAsText(outputPath, 1); - } else { - joinedStream.print(); - } - - // execute program - env.execute("Windowed Join Example"); - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - private final static String[] names = {"tom", "jerry", "alice", "bob", "john", "grace"}; - private final static int GRADE_COUNT = 5; - private final static int SALARY_MAX = 10000; - private final static int SLEEP_TIME = 10; - - /** - * Continuously emit tuples with random names and integers (grades). - */ - public static class GradeSource implements SourceFunction<Tuple3<Long, String, Integer>> { - private static final long serialVersionUID = 1L; - - private Random rand; - private Tuple3<Long, String, Integer> outTuple; - private volatile boolean isRunning = true; - private int counter; - - public GradeSource() { - rand = new Random(); - outTuple = new Tuple3<>(); - } - - @Override - public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception { - while (isRunning && counter < 100) { - outTuple.f0 = System.currentTimeMillis(); - outTuple.f1 = names[rand.nextInt(names.length)]; - outTuple.f2 = rand.nextInt(GRADE_COUNT) + 1; - Thread.sleep(rand.nextInt(SLEEP_TIME) + 1); - counter++; - ctx.collect(outTuple); - } - } - - @Override - public void cancel() { - isRunning = false; - } - } - - /** - * Continuously emit tuples with random names and integers (salaries). - */ - public static class SalarySource extends RichSourceFunction<Tuple3<Long, String, Integer>> { - private static final long serialVersionUID = 1L; - - private transient Random rand; - private transient Tuple3<Long, String, Integer> outTuple; - private volatile boolean isRunning; - private int counter; - - public void open(Configuration parameters) throws Exception { - super.open(parameters); - rand = new Random(); - outTuple = new Tuple3<Long, String, Integer>(); - isRunning = true; - } - - - @Override - public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception { - while (isRunning && counter < 100) { - outTuple.f0 = System.currentTimeMillis(); - outTuple.f1 = names[rand.nextInt(names.length)]; - outTuple.f2 = rand.nextInt(SALARY_MAX) + 1; - Thread.sleep(rand.nextInt(SLEEP_TIME) + 1); - counter++; - ctx.collect(outTuple); - } - } - - @Override - public void cancel() { - isRunning = false; - } - } - - public static class MySourceMap extends RichMapFunction<String, Tuple3<Long, String, Integer>> { - - private static final long serialVersionUID = 1L; - - private String[] record; - - public MySourceMap() { - record = new String[2]; - } - - @Override - public Tuple3<Long, String, Integer> map(String line) throws Exception { - record = line.substring(1, line.length() - 1).split(","); - return new Tuple3<>(Long.parseLong(record[0]), record[1], Integer.parseInt(record[2])); - } - } - - public static class MyJoinFunction - implements - JoinFunction<Tuple3<Long, String, Integer>, Tuple3<Long, String, Integer>, Tuple3<String, Integer, Integer>> { - - private static final long serialVersionUID = 1L; - - private Tuple3<String, Integer, Integer> joined = new Tuple3<>(); - - @Override - public Tuple3<String, Integer, Integer> join(Tuple3<Long, String, Integer> first, - Tuple3<Long, String, Integer> second) throws Exception { - joined.f0 = first.f1; - joined.f1 = first.f2; - joined.f2 = second.f2; - return joined; - } - } - - private static class MyTimestampExtractor implements TimestampExtractor<Tuple3<Long, String, Integer>> { - private static final long serialVersionUID = 1L; - - @Override - public long extractTimestamp(Tuple3<Long, String, Integer> element, long currentTimestamp) { - return element.f0; - } - - @Override - public long extractWatermark(Tuple3<Long, String, Integer> element, long currentTimestamp) { - return element.f0 - 1; - } - - @Override - public long getCurrentWatermark() { - return Long.MIN_VALUE; - } - } - - private static class NameKeySelector implements KeySelector<Tuple3<Long, String, Integer>, String> { - private static final long serialVersionUID = 1L; - - @Override - public String getKey(Tuple3<Long, String, Integer> value) throws Exception { - return value.f1; - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileInput = false; - private static boolean fileOutput = false; - - private static String gradesPath; - private static String salariesPath; - private static String outputPath; - - private static boolean parseParameters(String[] args) { - - if (args.length > 0) { - // parse input arguments - if (args.length == 1) { - fileOutput = true; - outputPath = args[0]; - } else if (args.length == 3) { - fileInput = true; - fileOutput = true; - gradesPath = args[0]; - salariesPath = args[1]; - outputPath = args[2]; - } else { - System.err.println("Usage: WindowJoin <result path> or WindowJoin <input path 1> <input path 2> " + - "<result path>"); - return false; - } - } else { - System.out.println("Executing WindowJoin with generated data."); - System.out.println(" Provide parameter to write to file."); - System.out.println(" Usage: WindowJoin <result path>"); - } - return true; - } - - private static Tuple2<DataStream<Tuple3<Long, String, Integer>>, DataStream<Tuple3<Long, String, Integer>>> getInputStreams( - StreamExecutionEnvironment env) { - - DataStream<Tuple3<Long, String, Integer>> grades; - DataStream<Tuple3<Long, String, Integer>> salaries; - - if (fileInput) { - grades = env.readTextFile(gradesPath).map(new MySourceMap()); - salaries = env.readTextFile(salariesPath).map(new MySourceMap()); - } else { - grades = env.addSource(new GradeSource()); - salaries = env.addSource(new SalarySource()); - } - - return Tuple2.of(grades, salaries); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java ---------------------------------------------------------------------- diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java deleted file mode 100644 index 15c1280..0000000 --- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.join.util; - -public class WindowJoinData { - - public static final String GRADES_INPUT = "(0,john,5)\n" + "(0,tom,3)\n" + "(0,alice,1)\n" + "(0,grace,5)\n" + - "(1,john,4)\n" + "(1,bob,1)\n" + "(1,alice,2)\n" + "(1,alice,3)\n" + "(1,bob,5)\n" + "(1,alice,3)\n" + "(1,tom,5)\n" + - "(2,john,2)\n" + "(2,john,1)\n" + "(2,grace,2)\n" + "(2,jerry,2)\n" + "(2,tom,4)\n" + "(2,bob,4)\n" + "(2,bob,2)\n" + - "(3, tom,2)\n" + "(3,alice,5)\n" + "(3,grace,5)\n" + "(3,grace,1)\n" + "(3,alice,1)\n" + "(3,grace,3)\n" + "(3,tom,1)\n" + - "(4,jerry,5)\n" + "(4,john,3)\n" + "(4,john,4)\n" + "(4,john,1)\n" + "(4,jerry,3)\n" + "(4,grace,3)\n" + "(4,bob,3)\n" + - "(5,john,3)\n" + "(5,jerry,4)\n" + "(5,tom,5)\n" + "(5,tom,4)\n" + "(5,john,2)\n" + "(5,jerry,1)\n" + "(5,bob,1)\n" + - "(6,john,5)\n" + "(6,grace,4)\n" + "(6,tom,5)\n" + "(6,john,4)\n" + "(6,tom,1)\n" + "(6,grace,1)\n" + "(6,john,2)\n" + - "(7,jerry,3)\n" + "(7,jerry,5)\n" + "(7,tom,2)\n" + "(7,tom,2)\n" + "(7,alice,4)\n" + "(7,tom,4)\n" + "(7,jerry,4)\n" + - "(8,john,3)\n" + "(8,grace,4)\n" + "(8,tom,3)\n" + "(8,jerry,4)\n" + "(8,john,5)\n" + "(8,john,4)\n" + "(8,jerry,1)\n" + - "(9,john,5)\n" + "(9,alice,2)\n" + "(9,tom,1)\n" + "(9,alice,5)\n" + "(9,grace,4)\n" + "(9,bob,4)\n" + "(9,jerry,1)\n" + - "(10,john,5)\n" + "(10,tom,4)\n" + "(10,tom,5)\n" + "(10,jerry,5)\n" + "(10,tom,1)\n" + "(10,grace,3)\n" + "(10,bob,5)\n" + - "(11,john,1)\n" + "(11,alice,1)\n" + "(11,grace,3)\n" + "(11,grace,1)\n" + "(11,jerry,1)\n" + "(11,jerry,4)\n" + - "(12,bob,4)\n" + "(12,alice,3)\n" + "(12,tom,5)\n" + "(12,alice,4)\n" + "(12,alice,4)\n" + "(12,grace,4)\n" + "(12,john,5)\n" + - "(13,john,5)\n" + "(13,grace,4)\n" + "(13,tom,4)\n" + "(13,john,4)\n" + "(13,john,5)\n" + "(13,alice,5)\n" + "(13,jerry,5)\n" + - "(14,john,3)\n" + "(14,tom,5)\n" + "(14,jerry,4)\n" + "(14,grace,4)\n" + "(14,john,3)\n" + "(14,bob,2)"; - - public static final String SALARIES_INPUT = "(0,john,6469)\n" + "(0,jerry,6760)\n" + "(0,jerry,8069)\n" + - "(1,tom,3662)\n" + "(1,grace,8427)\n" + "(1,john,9425)\n" + "(1,bob,9018)\n" + "(1,john,352)\n" + "(1,tom,3770)\n" + - "(2,grace,7622)\n" + "(2,jerry,7441)\n" + "(2,alice,1468)\n" + "(2,bob,5472)\n" + "(2,grace,898)\n" + - "(3,tom,3849)\n" + "(3,grace,1865)\n" + "(3,alice,5582)\n" + "(3,john,9511)\n" + "(3,alice,1541)\n" + - "(4,john,2477)\n" + "(4,grace,3561)\n" + "(4,john,1670)\n" + "(4,grace,7290)\n" + "(4,grace,6565)\n" + - "(5,tom,6179)\n" + "(5,tom,1601)\n" + "(5,john,2940)\n" + "(5,bob,4685)\n" + "(5,bob,710)\n" + "(5,bob,5936)\n" + - "(6,jerry,1412)\n" + "(6,grace,6515)\n" + "(6,grace,3321)\n" + "(6,tom,8088)\n" + "(6,john,2876)\n" + - "(7,bob,9896)\n" + "(7,grace,7368)\n" + "(7,grace,9749)\n" + "(7,bob,2048)\n" + "(7,alice,4782)\n" + - "(8,alice,3375)\n" + "(8,tom,5841)\n" + "(8,bob,958)\n" + "(8,bob,5258)\n" + "(8,tom,3935)\n" + "(8,jerry,4394)\n" + - "(9,alice,102)\n" + "(9,alice,4931)\n" + "(9,alice,5240)\n" + "(9,jerry,7951)\n" + "(9,john,5675)\n" + - "(10,bob,609)\n" + "(10,alice,5997)\n" + "(10,jerry,9651)\n" + "(10,alice,1328)\n" + "(10,bob,1022)\n" + - "(11,grace,2578)\n" + "(11,jerry,9704)\n" + "(11,tom,4476)\n" + "(11,grace,3784)\n" + "(11,alice,6144)\n" + - "(12,bob,6213)\n" + "(12,alice,7525)\n" + "(12,jerry,2908)\n" + "(12,grace,8464)\n" + "(12,jerry,9920)\n" + - "(13,bob,3720)\n" + "(13,bob,7612)\n" + "(13,alice,7211)\n" + "(13,jerry,6484)\n" + "(13,alice,1711)\n" + - "(14,jerry,5994)\n" + "(14,grace,928)\n" + "(14,jerry,2492)\n" + "(14,grace,9080)\n" + "(14,tom,4330)\n" + - "(15,bob,8302)\n" + "(15,john,4981)\n" + "(15,tom,1781)\n" + "(15,grace,1379)\n" + "(15,jerry,3700)\n" + - "(16,jerry,3584)\n" + "(16,jerry,2038)\n" + "(16,jerry,3902)\n" + "(16,tom,1336)\n" + "(16,jerry,7500)\n" + - "(17,tom,3648)\n" + "(17,alice,2533)\n" + "(17,tom,8685)\n" + "(17,bob,3968)\n" + "(17,tom,3241)\n" + "(17,bob,7461)\n" + - "(18,jerry,2138)\n" + "(18,alice,7503)\n" + "(18,alice,6424)\n" + "(18,tom,140)\n" + "(18,john,9802)\n" + - "(19,grace,2977)\n" + "(19,grace,889)\n" + "(19,john,1338)"; - - private WindowJoinData() { - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java ---------------------------------------------------------------------- diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java deleted file mode 100644 index 32cf430..0000000 --- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.ml; - -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.TimestampExtractor; -import org.apache.flink.streaming.api.functions.co.CoMapFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; -import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.util.Collector; - -import java.util.concurrent.TimeUnit; - -/** - * Skeleton for incremental machine learning algorithm consisting of a - * pre-computed model, which gets updated for the new inputs and new input data - * for which the job provides predictions. - * - * <p> - * This may serve as a base of a number of algorithms, e.g. updating an - * incremental Alternating Least Squares model while also providing the - * predictions. - * - * <p> - * This example shows how to use: - * <ul> - * <li>Connected streams - * <li>CoFunctions - * <li>Tuple data types - * </ul> - */ -public class IncrementalLearningSkeleton { - - private static DataStream<Integer> trainingData = null; - private static DataStream<Integer> newData = null; - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - - trainingData = env.addSource(new FiniteTrainingDataSource()); - newData = env.addSource(new FiniteNewDataSource()); - - // build new model on every second of new data - DataStream<Double[]> model = trainingData - .assignTimestamps(new LinearTimestamp()) - .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS)) - .apply(new PartialModelBuilder()); - - // use partial model for newData - DataStream<Integer> prediction = newData.connect(model).map(new Predictor()); - - // emit result - if (fileOutput) { - prediction.writeAsText(outputPath, 1); - } else { - prediction.print(); - } - - // execute program - env.execute("Streaming Incremental Learning"); - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** - * Feeds new data for newData. By default it is implemented as constantly - * emitting the Integer 1 in a loop. - */ - public static class FiniteNewDataSource implements SourceFunction<Integer> { - private static final long serialVersionUID = 1L; - private int counter; - - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - Thread.sleep(15); - while (counter < 50) { - ctx.collect(getNewData()); - } - } - - @Override - public void cancel() { - // No cleanup needed - } - - private Integer getNewData() throws InterruptedException { - Thread.sleep(5); - counter++; - return 1; - } - } - - /** - * Feeds new training data for the partial model builder. By default it is - * implemented as constantly emitting the Integer 1 in a loop. - */ - public static class FiniteTrainingDataSource implements SourceFunction<Integer> { - private static final long serialVersionUID = 1L; - private int counter = 0; - - @Override - public void run(SourceContext<Integer> collector) throws Exception { - while (counter < 8200) { - collector.collect(getTrainingData()); - } - } - - @Override - public void cancel() { - // No cleanup needed - } - - private Integer getTrainingData() throws InterruptedException { - counter++; - return 1; - } - } - - public static class LinearTimestamp implements TimestampExtractor<Integer> { - private static final long serialVersionUID = 1L; - - private long counter = 0L; - - @Override - public long extractTimestamp(Integer element, long currentTimestamp) { - return counter += 10L; - } - - @Override - public long extractWatermark(Integer element, long currentTimestamp) { - return counter - 1; - } - - @Override - public long getCurrentWatermark() { - return Long.MIN_VALUE; - } - - } - - /** - * Builds up-to-date partial models on new training data. - */ - public static class PartialModelBuilder implements AllWindowFunction<Integer, Double[], TimeWindow> { - private static final long serialVersionUID = 1L; - - protected Double[] buildPartialModel(Iterable<Integer> values) { - return new Double[]{1.}; - } - - @Override - public void apply(TimeWindow window, Iterable<Integer> values, Collector<Double[]> out) throws Exception { - out.collect(buildPartialModel(values)); - } - } - - /** - * Creates newData using the model produced in batch-processing and the - * up-to-date partial model. - * <p> - * By defaults emits the Integer 0 for every newData and the Integer 1 - * for every model update. - * </p> - */ - public static class Predictor implements CoMapFunction<Integer, Double[], Integer> { - private static final long serialVersionUID = 1L; - - Double[] batchModel = null; - Double[] partialModel = null; - - @Override - public Integer map1(Integer value) { - // Return newData - return predict(value); - } - - @Override - public Integer map2(Double[] value) { - // Update model - partialModel = value; - batchModel = getBatchModel(); - return 1; - } - - // pulls model built with batch-job on the old training data - protected Double[] getBatchModel() { - return new Double[]{0.}; - } - - // performs newData using the two models - protected Integer predict(Integer inTuple) { - return 0; - } - - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String outputPath; - - private static boolean parseParameters(String[] args) { - - if (args.length > 0) { - // parse input arguments - fileOutput = true; - if (args.length == 1) { - outputPath = args[0]; - } else { - System.err.println("Usage: IncrementalLearningSkeleton <result path>"); - return false; - } - } else { - System.out.println("Executing IncrementalLearningSkeleton with generated data."); - System.out.println(" Provide parameter to write to file."); - System.out.println(" Usage: IncrementalLearningSkeleton <result path>"); - } - return true; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java ---------------------------------------------------------------------- diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java deleted file mode 100644 index 8a6cd88..0000000 --- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.ml.util; - -public class IncrementalLearningSkeletonData { - - public static final String RESULTS = "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + - "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "0\n" + "0\n" + - "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + - "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + - "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + - "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + - "0\n" + "0\n" + "0\n" + "0\n"; - - private IncrementalLearningSkeletonData() { - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java ---------------------------------------------------------------------- diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java deleted file mode 100644 index cecabdd..0000000 --- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.socket; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer; - -/** - * This example shows an implementation of WordCount with data from a text - * socket. To run the example make sure that the service providing the text data - * is already up and running. - * <p> - * To start an example socket text stream on your local machine run netcat from - * a command line: <code>nc -lk 9999</code>, where the parameter specifies the - * port number. - * </p> - * <p> - * Usage: - * <code>SocketTextStreamWordCount <hostname> <port> <result path></code> - * </p> - * <p> - * This example shows how to: - * <ul> - * <li>use StreamExecutionEnvironment.socketTextStream - * <li>write a simple Flink program, - * <li>write and use user-defined functions. - * </ul> - * - * @see <a href="www.openbsd.org/cgi-bin/man.cgi?query=nc">netcat</a> - */ -public class SocketTextStreamWordCount { - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - // set up the execution environment - final StreamExecutionEnvironment env = StreamExecutionEnvironment - .getExecutionEnvironment(); - - // get input data - DataStream<String> text = env.socketTextStream(hostName, port, '\n', 0); - - DataStream<Tuple2<String, Integer>> counts = - // split up the lines in pairs (2-tuples) containing: (word,1) - text.flatMap(new Tokenizer()) - // group by the tuple field "0" and sum up tuple field "1" - .keyBy(0) - .sum(1); - - if (fileOutput) { - counts.writeAsText(outputPath, 1); - } else { - counts.print(); - } - - // execute program - env.execute("WordCount from SocketTextStream Example"); - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String hostName; - private static int port; - private static String outputPath; - - private static boolean parseParameters(String[] args) { - - // parse input arguments - if (args.length == 3) { - fileOutput = true; - hostName = args[0]; - port = Integer.valueOf(args[1]); - outputPath = args[2]; - } else if (args.length == 2) { - hostName = args[0]; - port = Integer.valueOf(args[1]); - } else { - System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output path>]"); - return false; - } - return true; - } -}