buildbot success in on flink-docs-release-0.9
The Buildbot has detected a restored build on builder flink-docs-release-0.9 while building . Full details are available at: https://ci.apache.org/builders/flink-docs-release-0.9/builds/652 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: bb_slave1_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.9' triggered this build Build Source Stamp: [branch release-0.9] HEAD Blamelist: Build succeeded! Sincerely, -The Buildbot
buildbot success in on flink-docs-release-0.10
The Buildbot has detected a restored build on builder flink-docs-release-0.10 while building . Full details are available at: https://ci.apache.org/builders/flink-docs-release-0.10/builds/537 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: bb_slave1_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.10' triggered this build Build Source Stamp: [branch release-0.10] HEAD Blamelist: Build succeeded! Sincerely, -The Buildbot
flink git commit: [FLINK-6162] [core] Fix bug in ByteArrayOutputStreamWithPos
Repository: flink Updated Branches: refs/heads/master 697cc9610 -> af34e5bd0 [FLINK-6162] [core] Fix bug in ByteArrayOutputStreamWithPos Fix off-by-one error in 'setPosition' and expand array when seeking outside the original bounds. This closes #3595 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af34e5bd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af34e5bd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af34e5bd Branch: refs/heads/master Commit: af34e5bd0bc8bf93c78b8bf37033ee660e7913be Parents: 697cc96 Author: wenlong.lwlAuthored: Wed Mar 22 09:48:42 2017 +0800 Committer: Greg Hogan Committed: Fri Apr 7 20:10:32 2017 -0400 -- .../memory/ByteArrayInputStreamWithPos.java | 3 +- .../memory/ByteArrayOutputStreamWithPos.java| 7 +- .../memory/ByteArrayInputStreamWithPosTest.java | 81 ++ .../ByteArrayOutputStreamWithPosTest.java | 112 +++ 4 files changed, 197 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/af34e5bd/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java -- diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java index dd381a4..1447e96 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java @@ -118,7 +118,8 @@ public class ByteArrayInputStreamWithPos extends InputStream { return position; } - public void setPos(int pos) { + public void setPosition(int pos) { + Preconditions.checkArgument(pos >= 0 && pos <= count, "Position out of bounds."); this.position = pos; } } http://git-wip-us.apache.org/repos/asf/flink/blob/af34e5bd/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java -- diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java index abf65b1..22330c5 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java @@ -101,16 +101,13 @@ public class ByteArrayOutputStreamWithPos extends OutputStream { return new String(buffer, 0, count, ConfigConstants.DEFAULT_CHARSET); } - private int getEndPosition() { - return buffer.length; - } - public int getPosition() { return count; } public void setPosition(int position) { - Preconditions.checkArgument(position < getEndPosition(), "Position out of bounds."); + Preconditions.checkArgument(position >= 0, "Position out of bounds."); + ensureCapacity(position + 1); count = position; } http://git-wip-us.apache.org/repos/asf/flink/blob/af34e5bd/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java -- diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java new file mode 100644 index 000..1e1902e --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package
flink git commit: [FLINK-6279] [table] Fix toString() of TableSourceScan.
Repository: flink Updated Branches: refs/heads/master 0038da415 -> 697cc9610 [FLINK-6279] [table] Fix toString() of TableSourceScan. - The digest of VolcanoRuleMatch matched different table sources with same field names as the same. This closes #3699. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/697cc961 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/697cc961 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/697cc961 Branch: refs/heads/master Commit: 697cc96106846547ff856aa5e478fee037ffde1a Parents: 0038da4 Author: godfreyheAuthored: Fri Apr 7 21:41:29 2017 +0800 Committer: Fabian Hueske Committed: Fri Apr 7 21:42:52 2017 +0200 -- .../table/plan/nodes/TableSourceScan.scala | 10 ++- .../apache/flink/table/TableSourceTest.scala| 29 2 files changed, 38 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/697cc961/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala index e0f7786..7bd5c5b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala @@ -55,7 +55,15 @@ abstract class TableSourceScan( } override def toString: String = { -s"Source(from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))" +val tableName = getTable.getQualifiedName +val s = s"table:$tableName, fields:(${getRowType.getFieldNames.asScala.toList.mkString(", ")})" + +val sourceDesc = tableSource.explainSource() +if (sourceDesc.nonEmpty) { + s"Scan($s, source:$sourceDesc)" +} else { + s"Scan($s)" +} } def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): TableSourceScan http://git-wip-us.apache.org/repos/asf/flink/blob/697cc961/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala index 1866e3c..24a32de 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala @@ -31,6 +31,35 @@ class TableSourceTest extends TableTestBase { private val projectedFields: Array[String] = Array("last", "id", "score") private val noCalcFields: Array[String] = Array("id", "score", "first") + @Test + def testTableSourceScanToString(): Unit = { +val (tableSource1, _) = filterableTableSource +val (tableSource2, _) = filterableTableSource +val util = batchTestUtil() +val tEnv = util.tEnv + +tEnv.registerTableSource("table1", tableSource1) +tEnv.registerTableSource("table2", tableSource2) + +val table1 = tEnv.scan("table1").where("amount > 2") +val table2 = tEnv.scan("table2").where("amount > 2") +val result = table1.unionAll(table2) + +val expected = binaryNode( + "DataSetUnion", + batchFilterableSourceTableNode( +"table1", +Array("name", "id", "amount", "price"), +"'amount > 2"), + batchFilterableSourceTableNode( +"table2", +Array("name", "id", "amount", "price"), +"'amount > 2"), + term("union", "name, id, amount, price") +) +util.verifyTable(result, expected) + } + // batch plan @Test
[2/2] flink git commit: [FLINK-5545] [table] Remove FlinkAggregateExpandDistinctAggregatesRule after bumping Calcite to v1.12.
[FLINK-5545] [table] Remove FlinkAggregateExpandDistinctAggregatesRule after bumping Calcite to v1.12. This closes #3695. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0038da41 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0038da41 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0038da41 Branch: refs/heads/master Commit: 0038da41553908a427dd20be75838cccb48c6bcf Parents: fa7907a Author: Kurt YoungAuthored: Fri Apr 7 17:46:06 2017 +0800 Committer: Fabian Hueske Committed: Fri Apr 7 14:09:56 2017 +0200 -- ...nkAggregateExpandDistinctAggregatesRule.java | 1158 -- .../flink/table/plan/rules/FlinkRuleSets.scala |8 +- 2 files changed, 3 insertions(+), 1163 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/0038da41/flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateExpandDistinctAggregatesRule.java -- diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateExpandDistinctAggregatesRule.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateExpandDistinctAggregatesRule.java deleted file mode 100644 index 9d4e08e..000 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateExpandDistinctAggregatesRule.java +++ /dev/null @@ -1,1158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.calcite.rules; - -import org.apache.calcite.plan.Contexts; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptRule; -import org.apache.calcite.plan.RelOptRuleCall; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.Aggregate; -import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.core.JoinRelType; -import org.apache.calcite.rel.core.RelFactories; -import org.apache.calcite.rel.logical.LogicalAggregate; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.SqlAggFunction; -import org.apache.calcite.sql.fun.SqlCountAggFunction; -import org.apache.calcite.sql.fun.SqlMinMaxAggFunction; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; -import org.apache.calcite.sql.fun.SqlSumAggFunction; -import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.tools.RelBuilder; -import org.apache.calcite.tools.RelBuilderFactory; -import org.apache.calcite.util.ImmutableBitSet; -import org.apache.calcite.util.ImmutableIntList; -import org.apache.calcite.util.Pair; -import org.apache.calcite.util.Util; - -import org.apache.flink.util.Preconditions; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - -import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - -/** - Copy calcite's AggregateExpandDistinctAggregatesRule to Flink project, - and do a quick fix to avoid some bad case mentioned in CALCITE-1558. - Should drop it and use calcite's AggregateExpandDistinctAggregatesRule - when we upgrade to calcite 1.12(above) - */ - -/** - * Planner rule that expands distinct aggregates - * (such as {@code COUNT(DISTINCT x)}) from a - * {@link org.apache.calcite.rel.logical.LogicalAggregate}. - * - * How this is done depends upon the arguments to the function. If all - * functions have the same argument - * (e.g. {@code
[1/2] flink git commit: [FLINK-6012] [table] Support SQL WindowStart and WindowEnd functions.
Repository: flink Updated Branches: refs/heads/master 635394751 -> 0038da415 [FLINK-6012] [table] Support SQL WindowStart and WindowEnd functions. This closes #3693. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa7907ab Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa7907ab Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa7907ab Branch: refs/heads/master Commit: fa7907ab0e90d182b6386802c97f9b4e001dc440 Parents: 6353947 Author: Haohui MaiAuthored: Fri Apr 7 01:08:12 2017 -0700 Committer: Fabian Hueske Committed: Fri Apr 7 14:07:05 2017 +0200 -- docs/dev/table_api.md | 67 +- .../flink/table/plan/rules/FlinkRuleSets.scala | 5 +- .../common/WindowStartEndPropertiesRule.scala | 122 +++ .../flink/table/validate/FunctionCatalog.scala | 8 +- .../scala/batch/sql/WindowAggregateTest.scala | 38 -- .../scala/stream/sql/WindowAggregateTest.scala | 42 --- 6 files changed, 252 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/fa7907ab/docs/dev/table_api.md -- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index 6f96920..2a838c7 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -1439,7 +1439,7 @@ Group windows are defined in the `GROUP BY` clause of a SQL query. Just like que TUMBLE(time_attr, interval) - Defines are tumbling time window. A tumbling time window assigns rows to non-overlapping, continuous windows with a fixed duration (interval). For example, a tumbling window of 5 minutes groups rows in 5 minutes intervals. Tumbling windows can be defined on event-time (stream + batch) or processing-time (stream). + Defines a tumbling time window. A tumbling time window assigns rows to non-overlapping, continuous windows with a fixed duration (interval). For example, a tumbling window of 5 minutes groups rows in 5 minutes intervals. Tumbling windows can be defined on event-time (stream + batch) or processing-time (stream). HOP(time_attr, interval, interval) @@ -1454,6 +1454,40 @@ Group windows are defined in the `GROUP BY` clause of a SQL query. Just like que For SQL queries on streaming tables, the `time_attr` argument of the group window function must be one of the `rowtime()` or `proctime()` time-indicators, which distinguish between event or processing time, respectively. For SQL on batch tables, the `time_attr` argument of the group window function must be an attribute of type `TIMESTAMP`. + Selecting Group Window Start and End Timestamps + +The start and end timestamps of group windows can be selected with the following auxiliary functions: + + + + + Auxiliary Function + Description + + + + + + +TUMBLE_START(time_attr, interval) +HOP_START(time_attr, interval, interval) +SESSION_START(time_attr, interval) + + Returns the start timestamp of the corresponding tumbling, hopping, and session window. + + + +TUMBLE_END(time_attr, interval) +HOP_END(time_attr, interval, interval) +SESSION_END(time_attr, interval) + + Returns the end timestamp of the corresponding tumbling, hopping, and session window. + + + + +Note that the auxiliary functions must be called with exactly same arguments as the group window function in the `GROUP BY` clause. + The following examples show how to specify SQL queries with group windows on streaming tables. @@ -1469,7 +1503,10 @@ tableEnv.registerDataStream("Orders", ds, "user, product, amount"); // compute SUM(amount) per day (in event-time) Table result1 = tableEnv.sql( - "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime(), INTERVAL '1' DAY), user"); + "SELECT user, " + + " TUMBLE_START(rowtime(), INTERVAL '1' DAY) as wStart, " + + " SUM(amount) FROM Orders " + + "GROUP BY TUMBLE(rowtime(), INTERVAL '1' DAY), user"); // compute SUM(amount) per day (in processing-time) Table result2 = tableEnv.sql( @@ -1481,7 +1518,12 @@ Table result3 = tableEnv.sql( // compute SUM(amount) per session with 12 hour inactivity gap (in event-time) Table result4 = tableEnv.sql( - "SELECT user, SUM(amount) FROM Orders GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user"); + "SELECT user, " + + " SESSION_START(rowtime(), INTERVAL '12' HOUR) AS sStart, " + + " SESSION_END(rowtime(), INTERVAL '12' HOUR) AS snd, " + + " SUM(amount) " + + "FROM Orders " + + "GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user"); {% endhighlight %} @@ -1498,7 +1540,14 @@