[1/4] flink git commit: [FLINK-6614] [table] Fix translation of group auxiliary functions (e.g., TUMBLE_END).
Repository: flink Updated Branches: refs/heads/release-1.3 45923ffb8 -> 51fb7ed79 [FLINK-6614] [table] Fix translation of group auxiliary functions (e.g., TUMBLE_END). Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51fb7ed7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51fb7ed7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51fb7ed7 Branch: refs/heads/release-1.3 Commit: 51fb7ed791bc8c5c1c35dffcd9855a2e5a8f3087 Parents: 0246ce5 Author: Fabian HueskeAuthored: Wed May 17 16:26:27 2017 +0200 Committer: Fabian Hueske Committed: Thu May 18 22:06:00 2017 +0200 -- .../common/WindowStartEndPropertiesRule.scala | 39 ++-- .../scala/stream/sql/WindowAggregateTest.scala | 27 ++ 2 files changed, 54 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/51fb7ed7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala index 7577deb..14e9b21 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala @@ -36,13 +36,21 @@ class WindowStartEndPropertiesRule override def matches(call: RelOptRuleCall): Boolean = { val project = call.rel(0).asInstanceOf[LogicalProject] // project includes at least on group auxiliary function -project.getProjects.exists { - case c: RexCall => c.getOperator.isGroupAuxiliary - case _ => false + +def hasGroupAuxiliaries(node: RexNode): Boolean = { + node match { +case c: RexCall if c.getOperator.isGroupAuxiliary => true +case c: RexCall => + c.operands.exists(hasGroupAuxiliaries) +case _ => false + } } + +project.getProjects.exists(hasGroupAuxiliaries) } override def onMatch(call: RelOptRuleCall): Unit = { + val project = call.rel(0).asInstanceOf[LogicalProject] val innerProject = call.rel(1).asInstanceOf[LogicalProject] val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate] @@ -62,20 +70,27 @@ class WindowStartEndPropertiesRule transformed.project( innerProject.getProjects ++ Seq(transformed.field("w$start"), transformed.field("w$end"))) -// replace window auxiliary function by access to window properties -transformed.project( - project.getProjects.map{ x => -if (WindowStartEndPropertiesRule.isWindowStart(x)) { +def replaceGroupAuxiliaries(node: RexNode): RexNode = { + node match { +case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) => // replace expression by access to window start - rexBuilder.makeCast(x.getType, transformed.field("w$start"), false) -} else if (WindowStartEndPropertiesRule.isWindowEnd(x)) { + rexBuilder.makeCast(c.getType, transformed.field("w$start"), false) +case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) => // replace expression by access to window end - rexBuilder.makeCast(x.getType, transformed.field("w$end"), false) -} else { + rexBuilder.makeCast(c.getType, transformed.field("w$end"), false) +case c: RexCall => + // replace expressions in children + val newOps = c.getOperands.map(replaceGroupAuxiliaries) + c.clone(c.getType, newOps) +case x => // preserve expression x -} } +} + +// replace window auxiliary function by access to window properties +transformed.project( + project.getProjects.map(replaceGroupAuxiliaries) ) val res = transformed.build() call.transformTo(res) http://git-wip-us.apache.org/repos/asf/flink/blob/51fb7ed7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index 2022db8..f95d0ab 100644 ---
[4/4] flink git commit: [FLINK-6618] [table] Fix translation of WindowProperties in Table API.
[FLINK-6618] [table] Fix translation of WindowProperties in Table API. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6583fb4c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6583fb4c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6583fb4c Branch: refs/heads/release-1.3 Commit: 6583fb4c1f85d5c4f57a81ec9a6294915ea165ac Parents: 45923ff Author: sunjincheng121Authored: Thu May 18 13:02:24 2017 +0800 Committer: Fabian Hueske Committed: Thu May 18 22:06:00 2017 +0200 -- .../flink/table/plan/ProjectionTranslator.scala | 20 ++-- .../GroupWindowStringExpressionTest.scala | 18 +++--- 2 files changed, 21 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6583fb4c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala index 802768e..69b437a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala @@ -334,20 +334,20 @@ object ProjectionTranslator { val l = replaceAggFunctionCall(b.left, tableEnv) val r = replaceAggFunctionCall(b.right, tableEnv) b.makeCopy(Array(l, r)) - // Functions calls case c @ Call(name, args) => val function = tableEnv.getFunctionCatalog.lookupFunction(name, args) -if (function.isInstanceOf[AggFunctionCall] || function.isInstanceOf[Aggregation]) { - function -} else { - val newArgs = -args.map( -(exp: Expression) => - replaceAggFunctionCall(exp, tableEnv)) - c.makeCopy(Array(name, newArgs)) +function match { + case a: AggFunctionCall => a + case a: Aggregation => a + case p: AbstractWindowProperty => p + case _ => +val newArgs = + args.map( +(exp: Expression) => + replaceAggFunctionCall(exp, tableEnv)) +c.makeCopy(Array(name, newArgs)) } - // Scala functions case sfc @ ScalarFunctionCall(clazz, args) => val newArgs: Seq[Expression] = http://git-wip-us.apache.org/repos/asf/flink/blob/6583fb4c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala index d261e36..1cc156e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala @@ -23,7 +23,7 @@ import org.apache.flink.table.api.java.{Slide => JSlide} import org.apache.flink.table.api.scala._ import org.apache.flink.table.functions.aggfunctions.CountAggFunction import org.apache.flink.table.utils.TableTestBase -import org.junit.{Assert, Test} +import org.junit.Test class GroupWindowStringExpressionTest extends TableTestBase { @@ -47,7 +47,9 @@ class GroupWindowStringExpressionTest extends TableTestBase { myCountFun('string), 'int.sum, weightAvgFun('long, 'int), -weightAvgFun('int, 'int) * 2) +weightAvgFun('int, 'int) * 2, +'w.start, +'w.end) // String / Java API val resJava = t @@ -55,11 +57,13 @@ class GroupWindowStringExpressionTest extends TableTestBase { .groupBy("w, string") .select( "string, " + - "myCountFun(string), " + - "int.sum, " + - "weightAvgFun(long, int), " + - "weightAvgFun(int, int) * 2") +"myCountFun(string), " + +"int.sum, " + +"weightAvgFun(long, int), " + +"weightAvgFun(int, int) * 2, " + +"start(w)," + +"end(w)") -Assert.assertEquals("Logical Plans do not match", resJava.logicalPlan, resJava.logicalPlan) +verifyTableEquals(resJava, resScala) } }
[3/4] flink git commit: [FLINK-6543] [table] Deprecate toDataStream and add toAppendStream.
[FLINK-6543] [table] Deprecate toDataStream and add toAppendStream. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0246ce51 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0246ce51 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0246ce51 Branch: refs/heads/release-1.3 Commit: 0246ce51aaa12ce6ce4ac95898e014916a99ccd2 Parents: c794d6e Author: twalthrAuthored: Wed May 17 11:31:33 2017 +0200 Committer: Fabian Hueske Committed: Thu May 18 22:06:00 2017 +0200 -- .../table/examples/scala/StreamSQLExample.scala | 2 +- .../examples/scala/StreamTableExample.scala | 2 +- .../flink/table/api/java/package-info.java | 5 +- .../table/api/java/StreamTableEnvironment.scala | 121 ++- .../api/scala/StreamTableEnvironment.scala | 55 - .../table/api/scala/TableConversions.scala | 96 +-- .../apache/flink/table/api/scala/package.scala | 2 +- .../table/api/java/stream/sql/SqlITCase.java| 8 +- .../api/scala/stream/TableSourceITCase.scala| 4 +- .../api/scala/stream/sql/OverWindowITCase.scala | 32 ++--- .../table/api/scala/stream/sql/SqlITCase.scala | 20 +-- .../api/scala/stream/table/CalcITCase.scala | 24 ++-- .../table/GroupWindowAggregationsITCase.scala | 10 +- .../scala/stream/table/OverWindowITCase.scala | 10 +- .../api/scala/stream/table/UnionITCase.scala| 8 +- .../datastream/DataStreamAggregateITCase.scala | 12 +- .../datastream/DataStreamCalcITCase.scala | 4 +- .../DataStreamUserDefinedFunctionITCase.scala | 12 +- .../datastream/TimeAttributesITCase.scala | 12 +- 19 files changed, 339 insertions(+), 100 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala -- diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala index 2cdd8b8..665913e 100644 --- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala +++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala @@ -62,7 +62,7 @@ object StreamSQLExample { "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " + "SELECT * FROM OrderB WHERE amount < 2") -result.toDataStream[Order].print() +result.toAppendStream[Order].print() env.execute() } http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala -- diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala index 5c5012b..1c0ffea 100644 --- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala +++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala @@ -55,7 +55,7 @@ object StreamTableExample { val result: DataStream[Order] = orderA.unionAll(orderB) .select('user, 'product, 'amount) .where('amount > 2) - .toDataStream[Order] + .toAppendStream[Order] result.print() http://git-wip-us.apache.org/repos/asf/flink/blob/0246ce51/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java -- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java index 2409872..3dbf50f 100644 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java @@ -57,8 +57,9 @@ * * As seen above, a {@link org.apache.flink.table.api.Table} can be converted back to the * underlying API representation using - * {@link org.apache.flink.table.api.java.BatchTableEnvironment#toDataSet(Table, java.lang.Class)} - * or {@link org.apache.flink.table.api.java.StreamTableEnvironment#toDataStream(Table, java.lang.Class)}}. + *
[2/4] flink git commit: [FLINK-6585] [table] Fix execution of Table examples in IDE.
[FLINK-6585] [table] Fix execution of Table examples in IDE. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c794d6e0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c794d6e0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c794d6e0 Branch: refs/heads/release-1.3 Commit: c794d6e0b2d5b0e26c2aa4e35ebe2f6c1e66e9c9 Parents: 6583fb4 Author: twalthrAuthored: Mon May 15 13:29:50 2017 +0200 Committer: Fabian Hueske Committed: Thu May 18 22:06:00 2017 +0200 -- flink-examples/flink-examples-table/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/c794d6e0/flink-examples/flink-examples-table/pom.xml -- diff --git a/flink-examples/flink-examples-table/pom.xml b/flink-examples/flink-examples-table/pom.xml index 684d883..98e7305 100644 --- a/flink-examples/flink-examples-table/pom.xml +++ b/flink-examples/flink-examples-table/pom.xml @@ -39,14 +39,14 @@ under the License. org.apache.flink flink-table_2.10 ${project.version} - provided + compile org.apache.flink flink-streaming-scala_2.10 ${project.version} - provided + compile
[2/2] flink git commit: [hotfix] [docs] Fix link to docker-compose.yml
[hotfix] [docs] Fix link to docker-compose.yml This closes #3887 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54206876 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54206876 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54206876 Branch: refs/heads/release-1.3 Commit: 542068760901cba2fd6f2b3f3ff6a980aee0e23e Parents: 979228b Author: Patrick LucasAuthored: Fri May 12 18:26:28 2017 +0200 Committer: Greg Hogan Committed: Thu May 18 16:01:17 2017 -0400 -- docs/setup/docker.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/54206876/docs/setup/docker.md -- diff --git a/docs/setup/docker.md b/docs/setup/docker.md index 29e696f..c8da06b 100644 --- a/docs/setup/docker.md +++ b/docs/setup/docker.md @@ -65,8 +65,8 @@ For example: [Docker Compose](https://docs.docker.com/compose/) is a convenient way to run a group of Docker containers locally. -An [example config file](https://github.com/docker-flink/examples) is available -on GitHub. +An [example config file](https://github.com/docker-flink/examples/blob/master/docker-compose.yml) +is available on GitHub. ### Usage
[1/2] flink git commit: [FLINK-6616] [docs] Clarify provenance of official Docker images
Repository: flink Updated Branches: refs/heads/release-1.3 979228b3e -> 45923ffb8 [FLINK-6616] [docs] Clarify provenance of official Docker images Note that the official Docker images for Flink are community supported and not an official release of the Apache Flink PMC. This closes #3932 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/45923ffb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/45923ffb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/45923ffb Branch: refs/heads/release-1.3 Commit: 45923ffb8e5fcc98c569e55137e52a495cc7635f Parents: 5420687 Author: Greg HoganAuthored: Wed May 17 12:27:34 2017 -0400 Committer: Greg Hogan Committed: Thu May 18 16:01:17 2017 -0400 -- docs/setup/docker.md | 11 +++ 1 file changed, 7 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/45923ffb/docs/setup/docker.md -- diff --git a/docs/setup/docker.md b/docs/setup/docker.md index c8da06b..4986f2a 100644 --- a/docs/setup/docker.md +++ b/docs/setup/docker.md @@ -24,15 +24,15 @@ under the License. --> [Docker](https://www.docker.com) is a popular container runtime. There are -official Flink Docker images available on Docker Hub which can be used directly -or extended to better integrate into a production environment. +official Docker images for Apache Flink available on Docker Hub which can be +used directly or extended to better integrate into a production environment. * This will be replaced by the TOC {:toc} -## Official Flink Docker Images +## Official Docker Images -The [official Flink Docker repository](https://hub.docker.com/_/flink/) is +The [official Docker repository](https://hub.docker.com/_/flink/) is hosted on Docker Hub and serves images of Flink version 1.2.1 and later. Images for each supported combination of Hadoop and Scala are available, and @@ -60,6 +60,9 @@ For example: * `flink:1.2-scala_2.10-alpine` --> +**Note:** The docker images are provided as a community project by individuals +on a best-effort basis. They are not official releases by the Apache Flink PMC. + ## Flink with Docker Compose [Docker Compose](https://docs.docker.com/compose/) is a convenient way to run a
[1/2] flink git commit: [hotfix] [docs] Fix link to docker-compose.yml
Repository: flink Updated Branches: refs/heads/master 9fc42df68 -> 9a64d50f0 [hotfix] [docs] Fix link to docker-compose.yml This closes #3887 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47cb7347 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47cb7347 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47cb7347 Branch: refs/heads/master Commit: 47cb7347d8e02195088b2dc1900ac53bd5d02b0d Parents: 9fc42df Author: Patrick LucasAuthored: Fri May 12 18:26:28 2017 +0200 Committer: Greg Hogan Committed: Thu May 18 15:54:09 2017 -0400 -- docs/setup/docker.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/47cb7347/docs/setup/docker.md -- diff --git a/docs/setup/docker.md b/docs/setup/docker.md index 29e696f..c8da06b 100644 --- a/docs/setup/docker.md +++ b/docs/setup/docker.md @@ -65,8 +65,8 @@ For example: [Docker Compose](https://docs.docker.com/compose/) is a convenient way to run a group of Docker containers locally. -An [example config file](https://github.com/docker-flink/examples) is available -on GitHub. +An [example config file](https://github.com/docker-flink/examples/blob/master/docker-compose.yml) +is available on GitHub. ### Usage
[2/2] flink git commit: [FLINK-6616] [docs] Clarify provenance of official Docker images
[FLINK-6616] [docs] Clarify provenance of official Docker images Note that the official Docker images for Flink are community supported and not an official release of the Apache Flink PMC. This closes #3932 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9a64d50f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9a64d50f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9a64d50f Branch: refs/heads/master Commit: 9a64d50f07c03102dfcb44656ec01852fdb56ac1 Parents: 47cb734 Author: Greg HoganAuthored: Wed May 17 12:27:34 2017 -0400 Committer: Greg Hogan Committed: Thu May 18 15:59:27 2017 -0400 -- docs/setup/docker.md | 11 +++ 1 file changed, 7 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/9a64d50f/docs/setup/docker.md -- diff --git a/docs/setup/docker.md b/docs/setup/docker.md index c8da06b..4986f2a 100644 --- a/docs/setup/docker.md +++ b/docs/setup/docker.md @@ -24,15 +24,15 @@ under the License. --> [Docker](https://www.docker.com) is a popular container runtime. There are -official Flink Docker images available on Docker Hub which can be used directly -or extended to better integrate into a production environment. +official Docker images for Apache Flink available on Docker Hub which can be +used directly or extended to better integrate into a production environment. * This will be replaced by the TOC {:toc} -## Official Flink Docker Images +## Official Docker Images -The [official Flink Docker repository](https://hub.docker.com/_/flink/) is +The [official Docker repository](https://hub.docker.com/_/flink/) is hosted on Docker Hub and serves images of Flink version 1.2.1 and later. Images for each supported combination of Hadoop and Scala are available, and @@ -60,6 +60,9 @@ For example: * `flink:1.2-scala_2.10-alpine` --> +**Note:** The docker images are provided as a community project by individuals +on a best-effort basis. They are not official releases by the Apache Flink PMC. + ## Flink with Docker Compose [Docker Compose](https://docs.docker.com/compose/) is a convenient way to run a
[1/4] flink git commit: [FLINK-6614] [table] Fix translation of group auxiliary functions (e.g., TUMBLE_END).
Repository: flink Updated Branches: refs/heads/master c995ebd29 -> 9fc42df68 [FLINK-6614] [table] Fix translation of group auxiliary functions (e.g., TUMBLE_END). This closes #3930. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9fc42df6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9fc42df6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9fc42df6 Branch: refs/heads/master Commit: 9fc42df68d746c633b0d3c8995e0031064bfd362 Parents: 3a65e5a Author: Fabian HueskeAuthored: Wed May 17 16:26:27 2017 +0200 Committer: Fabian Hueske Committed: Thu May 18 21:22:12 2017 +0200 -- .../common/WindowStartEndPropertiesRule.scala | 39 ++-- .../scala/stream/sql/WindowAggregateTest.scala | 27 ++ 2 files changed, 54 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/9fc42df6/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala index 7577deb..14e9b21 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala @@ -36,13 +36,21 @@ class WindowStartEndPropertiesRule override def matches(call: RelOptRuleCall): Boolean = { val project = call.rel(0).asInstanceOf[LogicalProject] // project includes at least on group auxiliary function -project.getProjects.exists { - case c: RexCall => c.getOperator.isGroupAuxiliary - case _ => false + +def hasGroupAuxiliaries(node: RexNode): Boolean = { + node match { +case c: RexCall if c.getOperator.isGroupAuxiliary => true +case c: RexCall => + c.operands.exists(hasGroupAuxiliaries) +case _ => false + } } + +project.getProjects.exists(hasGroupAuxiliaries) } override def onMatch(call: RelOptRuleCall): Unit = { + val project = call.rel(0).asInstanceOf[LogicalProject] val innerProject = call.rel(1).asInstanceOf[LogicalProject] val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate] @@ -62,20 +70,27 @@ class WindowStartEndPropertiesRule transformed.project( innerProject.getProjects ++ Seq(transformed.field("w$start"), transformed.field("w$end"))) -// replace window auxiliary function by access to window properties -transformed.project( - project.getProjects.map{ x => -if (WindowStartEndPropertiesRule.isWindowStart(x)) { +def replaceGroupAuxiliaries(node: RexNode): RexNode = { + node match { +case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) => // replace expression by access to window start - rexBuilder.makeCast(x.getType, transformed.field("w$start"), false) -} else if (WindowStartEndPropertiesRule.isWindowEnd(x)) { + rexBuilder.makeCast(c.getType, transformed.field("w$start"), false) +case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) => // replace expression by access to window end - rexBuilder.makeCast(x.getType, transformed.field("w$end"), false) -} else { + rexBuilder.makeCast(c.getType, transformed.field("w$end"), false) +case c: RexCall => + // replace expressions in children + val newOps = c.getOperands.map(replaceGroupAuxiliaries) + c.clone(c.getType, newOps) +case x => // preserve expression x -} } +} + +// replace window auxiliary function by access to window properties +transformed.project( + project.getProjects.map(replaceGroupAuxiliaries) ) val res = transformed.build() call.transformTo(res) http://git-wip-us.apache.org/repos/asf/flink/blob/9fc42df6/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index 2022db8..f95d0ab 100644 ---
[2/4] flink git commit: [FLINK-6585] [table] Fix execution of Table examples in IDE.
[FLINK-6585] [table] Fix execution of Table examples in IDE. This closes #3905. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5310ed0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5310ed0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5310ed0 Branch: refs/heads/master Commit: d5310ed02b30ef16f06be6efc52af5e183df26a7 Parents: 41aa98a Author: twalthrAuthored: Mon May 15 13:29:50 2017 +0200 Committer: Fabian Hueske Committed: Thu May 18 21:22:12 2017 +0200 -- flink-examples/flink-examples-table/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/d5310ed0/flink-examples/flink-examples-table/pom.xml -- diff --git a/flink-examples/flink-examples-table/pom.xml b/flink-examples/flink-examples-table/pom.xml index e59d8c6..8574650 100644 --- a/flink-examples/flink-examples-table/pom.xml +++ b/flink-examples/flink-examples-table/pom.xml @@ -39,14 +39,14 @@ under the License. org.apache.flink flink-table_${scala.binary.version} ${project.version} - provided + compile org.apache.flink flink-streaming-scala_${scala.binary.version} ${project.version} - provided + compile
[3/4] flink git commit: [FLINK-6543] [table] Deprecate toDataStream and add toAppendStream.
[FLINK-6543] [table] Deprecate toDataStream and add toAppendStream. This closes #3929. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3a65e5ac Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3a65e5ac Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3a65e5ac Branch: refs/heads/master Commit: 3a65e5acbcc29636b0ce1631815861089fc21dca Parents: d5310ed Author: twalthrAuthored: Wed May 17 11:31:33 2017 +0200 Committer: Fabian Hueske Committed: Thu May 18 21:22:12 2017 +0200 -- .../table/examples/scala/StreamSQLExample.scala | 2 +- .../examples/scala/StreamTableExample.scala | 2 +- .../flink/table/api/java/package-info.java | 5 +- .../table/api/java/StreamTableEnvironment.scala | 121 ++- .../api/scala/StreamTableEnvironment.scala | 55 - .../table/api/scala/TableConversions.scala | 96 +-- .../apache/flink/table/api/scala/package.scala | 2 +- .../table/api/java/stream/sql/SqlITCase.java| 8 +- .../api/scala/stream/TableSourceITCase.scala| 4 +- .../api/scala/stream/sql/OverWindowITCase.scala | 32 ++--- .../table/api/scala/stream/sql/SqlITCase.scala | 20 +-- .../api/scala/stream/table/CalcITCase.scala | 24 ++-- .../table/GroupWindowAggregationsITCase.scala | 10 +- .../scala/stream/table/OverWindowITCase.scala | 10 +- .../api/scala/stream/table/UnionITCase.scala| 8 +- .../datastream/DataStreamAggregateITCase.scala | 12 +- .../datastream/DataStreamCalcITCase.scala | 4 +- .../DataStreamUserDefinedFunctionITCase.scala | 12 +- .../datastream/TimeAttributesITCase.scala | 12 +- 19 files changed, 339 insertions(+), 100 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala -- diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala index 2cdd8b8..665913e 100644 --- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala +++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala @@ -62,7 +62,7 @@ object StreamSQLExample { "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " + "SELECT * FROM OrderB WHERE amount < 2") -result.toDataStream[Order].print() +result.toAppendStream[Order].print() env.execute() } http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala -- diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala index 5c5012b..1c0ffea 100644 --- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala +++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala @@ -55,7 +55,7 @@ object StreamTableExample { val result: DataStream[Order] = orderA.unionAll(orderB) .select('user, 'product, 'amount) .where('amount > 2) - .toDataStream[Order] + .toAppendStream[Order] result.print() http://git-wip-us.apache.org/repos/asf/flink/blob/3a65e5ac/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java -- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java index 2409872..3dbf50f 100644 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java @@ -57,8 +57,9 @@ * * As seen above, a {@link org.apache.flink.table.api.Table} can be converted back to the * underlying API representation using - * {@link org.apache.flink.table.api.java.BatchTableEnvironment#toDataSet(Table, java.lang.Class)} - * or {@link org.apache.flink.table.api.java.StreamTableEnvironment#toDataStream(Table,
[2/3] flink git commit: [FLINK-6440][metrics] Downgrade fetching failure logging to DEBUG
[FLINK-6440][metrics] Downgrade fetching failure logging to DEBUG Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e03f1b52 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e03f1b52 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e03f1b52 Branch: refs/heads/master Commit: e03f1b52eee7e73d00846ec0dd102da808d9d63e Parents: 2b0a3fa Author: zentolAuthored: Tue May 16 10:19:18 2017 +0200 Committer: zentol Committed: Thu May 18 17:44:05 2017 +0200 -- .../org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e03f1b52/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java index 7ffadce..4f92148 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java @@ -169,7 +169,7 @@ public class MetricFetcher { future.onFailure(new OnFailure() { @Override public void onFailure(Throwable failure) throws Throwable { - LOG.warn(message, failure); + LOG.debug(message, failure); } }, ctx); }
[1/3] flink git commit: [FLINK-6596][travis] Disable javadoc generation for jdk 7
Repository: flink Updated Branches: refs/heads/master 4a314a80e -> c995ebd29 [FLINK-6596][travis] Disable javadoc generation for jdk 7 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b0a3fa0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b0a3fa0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b0a3fa0 Branch: refs/heads/master Commit: 2b0a3fa038787c3a08b1c346e9345bcae13f14d6 Parents: 4a314a8 Author: zentolAuthored: Tue May 16 09:19:15 2017 +0200 Committer: zentol Committed: Thu May 18 17:43:55 2017 +0200 -- .travis.yml | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/2b0a3fa0/.travis.yml -- diff --git a/.travis.yml b/.travis.yml index 74dcaf2..44ea97d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -30,18 +30,18 @@ matrix: env: PROFILE="-Dhadoop.version=2.6.3 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true" - jdk: "openjdk7" - env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis" + env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true" - jdk: "openjdk7" - env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis" + env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true" - jdk: "openjdk7" - env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis" + env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true" - jdk: "oraclejdk7" - env: PROFILE="-Dhadoop.version=2.3.0 -Pflink-fast-tests-a,include-kinesis" + env: PROFILE="-Dhadoop.version=2.3.0 -Pflink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true" - jdk: "oraclejdk7" - env: PROFILE="-Dhadoop.version=2.3.0 -Pflink-fast-tests-b,include-kinesis" + env: PROFILE="-Dhadoop.version=2.3.0 -Pflink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true" - jdk: "oraclejdk7" - env: PROFILE="-Dhadoop.version=2.3.0 -Pflink-fast-tests-c,include-kinesis" + env: PROFILE="-Dhadoop.version=2.3.0 -Pflink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true" git:
[3/3] flink git commit: [FLINK-6416] Fix divide-by-zero in InputGateMetrics
[FLINK-6416] Fix divide-by-zero in InputGateMetrics Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c995ebd2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c995ebd2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c995ebd2 Branch: refs/heads/master Commit: c995ebd290b04c6426deff5a515ab0d0f1246ac7 Parents: e03f1b5 Author: zentolAuthored: Mon May 15 13:44:54 2017 +0200 Committer: zentol Committed: Thu May 18 17:44:15 2017 +0200 -- .../runtime/io/network/partition/consumer/InputGateMetrics.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/c995ebd2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java index c9fed9b..796a6db 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java @@ -130,7 +130,7 @@ public class InputGateMetrics { } } - return total / (float) count; + return count == 0 ? 0 : total / (float) count; } //
[3/3] flink git commit: [FLINK-6416] Fix divide-by-zero in InputGateMetrics
[FLINK-6416] Fix divide-by-zero in InputGateMetrics Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/979228b3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/979228b3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/979228b3 Branch: refs/heads/release-1.3 Commit: 979228b3e2cac52c79c78855645a536d515befc9 Parents: 5569c4f Author: zentolAuthored: Mon May 15 13:44:54 2017 +0200 Committer: zentol Committed: Thu May 18 17:43:08 2017 +0200 -- .../runtime/io/network/partition/consumer/InputGateMetrics.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/979228b3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java index c9fed9b..796a6db 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java @@ -130,7 +130,7 @@ public class InputGateMetrics { } } - return total / (float) count; + return count == 0 ? 0 : total / (float) count; } //
[2/3] flink git commit: [FLINK-6440][metrics] Downgrade fetching failure logging to DEBUG
[FLINK-6440][metrics] Downgrade fetching failure logging to DEBUG Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5569c4fa Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5569c4fa Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5569c4fa Branch: refs/heads/release-1.3 Commit: 5569c4fafb08755ef12b7a96a173170dad883184 Parents: 44be162 Author: zentolAuthored: Tue May 16 10:19:18 2017 +0200 Committer: zentol Committed: Thu May 18 17:42:55 2017 +0200 -- .../org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/5569c4fa/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java index 7ffadce..4f92148 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java @@ -169,7 +169,7 @@ public class MetricFetcher { future.onFailure(new OnFailure() { @Override public void onFailure(Throwable failure) throws Throwable { - LOG.warn(message, failure); + LOG.debug(message, failure); } }, ctx); }
[1/3] flink git commit: [FLINK-6596][travis] Disable javadoc generation for jdk 7
Repository: flink Updated Branches: refs/heads/release-1.3 ca3e403e9 -> 979228b3e [FLINK-6596][travis] Disable javadoc generation for jdk 7 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44be1629 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44be1629 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44be1629 Branch: refs/heads/release-1.3 Commit: 44be162980d7f18e5859f215cd3283b71ae9d447 Parents: ca3e403 Author: zentolAuthored: Tue May 16 09:19:15 2017 +0200 Committer: zentol Committed: Thu May 18 17:42:42 2017 +0200 -- .travis.yml | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/44be1629/.travis.yml -- diff --git a/.travis.yml b/.travis.yml index 74dcaf2..44ea97d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -30,18 +30,18 @@ matrix: env: PROFILE="-Dhadoop.version=2.6.3 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true" - jdk: "openjdk7" - env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis" + env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true" - jdk: "openjdk7" - env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis" + env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true" - jdk: "openjdk7" - env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis" + env: PROFILE="-Dhadoop.version=2.4.1 -Dscala-2.11 -Pinclude-yarn-tests,flink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true" - jdk: "oraclejdk7" - env: PROFILE="-Dhadoop.version=2.3.0 -Pflink-fast-tests-a,include-kinesis" + env: PROFILE="-Dhadoop.version=2.3.0 -Pflink-fast-tests-a,include-kinesis -Dmaven.javadoc.skip=true" - jdk: "oraclejdk7" - env: PROFILE="-Dhadoop.version=2.3.0 -Pflink-fast-tests-b,include-kinesis" + env: PROFILE="-Dhadoop.version=2.3.0 -Pflink-fast-tests-b,include-kinesis -Dmaven.javadoc.skip=true" - jdk: "oraclejdk7" - env: PROFILE="-Dhadoop.version=2.3.0 -Pflink-fast-tests-c,include-kinesis" + env: PROFILE="-Dhadoop.version=2.3.0 -Pflink-fast-tests-c,include-kinesis -Dmaven.javadoc.skip=true" git:
flink git commit: [FLINK-6031][yarn] Add config parameter for user-jar inclusion in classpath
Repository: flink Updated Branches: refs/heads/release-1.3 5605feb5d -> ca3e403e9 [FLINK-6031][yarn] Add config parameter for user-jar inclusion in classpath This closes #3931 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca3e403e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca3e403e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca3e403e Branch: refs/heads/release-1.3 Commit: ca3e403e9f7845d455be4e3df0b00f8e4bac02db Parents: 5605feb Author: zentolAuthored: Wed May 17 18:15:27 2017 +0200 Committer: Robert Metzger Committed: Thu May 18 14:03:17 2017 +0200 -- docs/setup/config.md| 2 + docs/setup/yarn_setup.md| 12 ++ .../yarn/AbstractYarnClusterDescriptor.java | 152 --- .../yarn/configuration/YarnConfigOptions.java | 18 +++ 4 files changed, 129 insertions(+), 55 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/ca3e403e/docs/setup/config.md -- diff --git a/docs/setup/config.md b/docs/setup/config.md index c4a7354..8a6f67d 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -457,6 +457,8 @@ use the `env.java.opts` setting, which is the `%jvmopts%` variable in the String - `yarn.tags` A comma-separated list of tags to apply to the Flink YARN application. +- `yarn.per-job-cluster.include-user-jar` (Default: ORDER) Control whether and how the user-jar is included in the system class path for per-job clusters. Setting this parameter to `DISABLED` causes the jar to be included in the user class path instead. Setting this parameter to one of `FIRST`, `LAST` or `ORDER` causes the jar to be included in the system class path, with the jar either being placed at the beginning of the class path (`FIRST`), at the end (`LAST`), or based on the lexicographic order (`ORDER`). + ### Mesos http://git-wip-us.apache.org/repos/asf/flink/blob/ca3e403e/docs/setup/yarn_setup.md -- diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md index 1ce45ad..190a796 100644 --- a/docs/setup/yarn_setup.md +++ b/docs/setup/yarn_setup.md @@ -245,6 +245,18 @@ Note: You can use a different configuration directory per job by setting the env Note: It is possible to combine `-m yarn-cluster` with a detached YARN submission (`-yd`) to "fire and forget" a Flink job to the YARN cluster. In this case, your application will not get any accumulator results or exceptions from the ExecutionEnvironment.execute() call! +### User jars & Classpath + +By default Flink will include the user jars into the system classpath when running a single job. This behavior can be controlled with the `yarn.per-job-cluster.include-user-jar` parameter. + +When setting this to `DISABLED` Flink will include the jar in the user classpath instead. + +The user-jars position in the class path can be controlled by setting the parameter to one of the following: + +- `ORDER`: (default) Adds the jar to the system class path based on the lexicographic order. +- `FIRST`: Adds the jar to the beginning of the system class path. +- `LAST`: Adds the jar to the end of the system class path. + ## Recovery behavior of Flink on YARN Flink's YARN client has the following configuration parameters to control how to behave in case of container failures. These parameters can be set either from the `conf/flink-conf.yaml` or when starting the YARN session, using `-D` parameters. http://git-wip-us.apache.org/repos/asf/flink/blob/ca3e403e/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java -- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 65525f2..3110a5b 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -77,6 +78,7 @@ import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList;
flink git commit: [FLINK-6031][yarn] Add config parameter for user-jar inclusion in classpath
Repository: flink Updated Branches: refs/heads/master e28380152 -> 4a314a80e [FLINK-6031][yarn] Add config parameter for user-jar inclusion in classpath This closes #3931 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4a314a80 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4a314a80 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4a314a80 Branch: refs/heads/master Commit: 4a314a80e64ea2af624131e5c02e1f167a22a354 Parents: e283801 Author: zentolAuthored: Wed May 17 18:15:27 2017 +0200 Committer: Robert Metzger Committed: Thu May 18 14:01:53 2017 +0200 -- docs/setup/config.md| 2 + docs/setup/yarn_setup.md| 12 ++ .../yarn/AbstractYarnClusterDescriptor.java | 152 --- .../yarn/configuration/YarnConfigOptions.java | 18 +++ 4 files changed, 129 insertions(+), 55 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/4a314a80/docs/setup/config.md -- diff --git a/docs/setup/config.md b/docs/setup/config.md index c4a7354..8a6f67d 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -457,6 +457,8 @@ use the `env.java.opts` setting, which is the `%jvmopts%` variable in the String - `yarn.tags` A comma-separated list of tags to apply to the Flink YARN application. +- `yarn.per-job-cluster.include-user-jar` (Default: ORDER) Control whether and how the user-jar is included in the system class path for per-job clusters. Setting this parameter to `DISABLED` causes the jar to be included in the user class path instead. Setting this parameter to one of `FIRST`, `LAST` or `ORDER` causes the jar to be included in the system class path, with the jar either being placed at the beginning of the class path (`FIRST`), at the end (`LAST`), or based on the lexicographic order (`ORDER`). + ### Mesos http://git-wip-us.apache.org/repos/asf/flink/blob/4a314a80/docs/setup/yarn_setup.md -- diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md index 1ce45ad..190a796 100644 --- a/docs/setup/yarn_setup.md +++ b/docs/setup/yarn_setup.md @@ -245,6 +245,18 @@ Note: You can use a different configuration directory per job by setting the env Note: It is possible to combine `-m yarn-cluster` with a detached YARN submission (`-yd`) to "fire and forget" a Flink job to the YARN cluster. In this case, your application will not get any accumulator results or exceptions from the ExecutionEnvironment.execute() call! +### User jars & Classpath + +By default Flink will include the user jars into the system classpath when running a single job. This behavior can be controlled with the `yarn.per-job-cluster.include-user-jar` parameter. + +When setting this to `DISABLED` Flink will include the jar in the user classpath instead. + +The user-jars position in the class path can be controlled by setting the parameter to one of the following: + +- `ORDER`: (default) Adds the jar to the system class path based on the lexicographic order. +- `FIRST`: Adds the jar to the beginning of the system class path. +- `LAST`: Adds the jar to the end of the system class path. + ## Recovery behavior of Flink on YARN Flink's YARN client has the following configuration parameters to control how to behave in case of container failures. These parameters can be set either from the `conf/flink-conf.yaml` or when starting the YARN session, using `-D` parameters. http://git-wip-us.apache.org/repos/asf/flink/blob/4a314a80/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java -- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 65525f2..3110a5b 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -77,6 +78,7 @@ import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import
flink git commit: [FLINK-6624] [cep] Fix SharedBuffer#hashCode().
Repository: flink Updated Branches: refs/heads/release-1.3 a30a8be98 -> 5605feb5d [FLINK-6624] [cep] Fix SharedBuffer#hashCode(). Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5605feb5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5605feb5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5605feb5 Branch: refs/heads/release-1.3 Commit: 5605feb5dd7c676a8e03102bd93dd6e1d43e19ad Parents: a30a8be Author: kkloudasAuthored: Thu May 18 11:27:42 2017 +0200 Committer: kkloudas Committed: Thu May 18 11:35:34 2017 +0200 -- .../src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/5605feb5/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java index ab134d0..d0f6bf4 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java @@ -754,7 +754,7 @@ public class SharedBuffer implements Serializable { @Override public int hashCode() { - return (int) (31 * (timestamp ^ timestamp >>> 32) + 31 * value.hashCode()) + counter; + return (int) (31 * (31 * (timestamp ^ timestamp >>> 32) + value.hashCode()) + counter); } }
flink git commit: [FLINK-6624] [cep] Fix SharedBuffer#hashCode().
Repository: flink Updated Branches: refs/heads/master 85e281be3 -> e28380152 [FLINK-6624] [cep] Fix SharedBuffer#hashCode(). Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e2838015 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e2838015 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e2838015 Branch: refs/heads/master Commit: e28380152a27dcbc794fae5b023e3223699a5813 Parents: 85e281b Author: kkloudasAuthored: Thu May 18 11:27:42 2017 +0200 Committer: kkloudas Committed: Thu May 18 11:27:42 2017 +0200 -- .../src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e2838015/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java index ab134d0..d0f6bf4 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java @@ -754,7 +754,7 @@ public class SharedBuffer implements Serializable { @Override public int hashCode() { - return (int) (31 * (timestamp ^ timestamp >>> 32) + 31 * value.hashCode()) + counter; + return (int) (31 * (31 * (timestamp ^ timestamp >>> 32) + value.hashCode()) + counter); } }
flink git commit: [FLINK-6570] QueryableStateClient docs with matching constructor signature
Repository: flink Updated Branches: refs/heads/release-1.3 8b5ba676f -> a30a8be98 [FLINK-6570] QueryableStateClient docs with matching constructor signature This closes #3926. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a30a8be9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a30a8be9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a30a8be9 Branch: refs/heads/release-1.3 Commit: a30a8be98c4a7416e4621c48e961dd8a802b56b4 Parents: 8b5ba67 Author: gosubplAuthored: Wed May 17 02:03:45 2017 +0200 Committer: Till Rohrmann Committed: Thu May 18 09:55:45 2017 +0200 -- docs/dev/stream/queryable_state.md | 12 +--- 1 file changed, 9 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/a30a8be9/docs/dev/stream/queryable_state.md -- diff --git a/docs/dev/stream/queryable_state.md b/docs/dev/stream/queryable_state.md index 62d6a33..ceb5f76 100644 --- a/docs/dev/stream/queryable_state.md +++ b/docs/dev/stream/queryable_state.md @@ -208,10 +208,16 @@ Once used in a job, you can retrieve the job ID and then query any key's current {% highlight java %} final Configuration config = new Configuration(); -config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress); -config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort); +config.setString(JobManagerOptions.ADDRESS, queryAddress); +config.setInteger(JobManagerOptions.PORT, queryPort); -QueryableStateClient client = new QueryableStateClient(config); +final HighAvailabilityServices highAvailabilityServices = + HighAvailabilityServicesUtils.createHighAvailabilityServices( + config, + TestingUtils.defaultExecutor(), + HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); + +QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices); final TypeSerializer keySerializer = TypeInformation.of(new TypeHint() {}).createSerializer(new ExecutionConfig());
flink git commit: [FLINK-6570] QueryableStateClient docs with matching constructor signature
Repository: flink Updated Branches: refs/heads/master d8a467b01 -> 85e281be3 [FLINK-6570] QueryableStateClient docs with matching constructor signature This closes #3926. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/85e281be Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/85e281be Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/85e281be Branch: refs/heads/master Commit: 85e281be3835fe8bb0a5bcaede072e5f26b7f087 Parents: d8a467b Author: gosubplAuthored: Wed May 17 02:03:45 2017 +0200 Committer: Till Rohrmann Committed: Thu May 18 09:55:09 2017 +0200 -- docs/dev/stream/queryable_state.md | 12 +--- 1 file changed, 9 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/85e281be/docs/dev/stream/queryable_state.md -- diff --git a/docs/dev/stream/queryable_state.md b/docs/dev/stream/queryable_state.md index 62d6a33..ceb5f76 100644 --- a/docs/dev/stream/queryable_state.md +++ b/docs/dev/stream/queryable_state.md @@ -208,10 +208,16 @@ Once used in a job, you can retrieve the job ID and then query any key's current {% highlight java %} final Configuration config = new Configuration(); -config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress); -config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort); +config.setString(JobManagerOptions.ADDRESS, queryAddress); +config.setInteger(JobManagerOptions.PORT, queryPort); -QueryableStateClient client = new QueryableStateClient(config); +final HighAvailabilityServices highAvailabilityServices = + HighAvailabilityServicesUtils.createHighAvailabilityServices( + config, + TestingUtils.defaultExecutor(), + HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); + +QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices); final TypeSerializer keySerializer = TypeInformation.of(new TypeHint() {}).createSerializer(new ExecutionConfig());