buildbot success in on flink-docs-release-0.9

2017-04-07 Thread buildbot
The Buildbot has detected a restored build on builder flink-docs-release-0.9 
while building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-0.9/builds/652

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave1_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.9' 
triggered this build
Build Source Stamp: [branch release-0.9] HEAD
Blamelist: 

Build succeeded!

Sincerely,
 -The Buildbot





buildbot success in on flink-docs-release-0.10

2017-04-07 Thread buildbot
The Buildbot has detected a restored build on builder flink-docs-release-0.10 
while building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-0.10/builds/537

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave1_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.10' 
triggered this build
Build Source Stamp: [branch release-0.10] HEAD
Blamelist: 

Build succeeded!

Sincerely,
 -The Buildbot





flink git commit: [FLINK-6162] [core] Fix bug in ByteArrayOutputStreamWithPos

2017-04-07 Thread greg
Repository: flink
Updated Branches:
  refs/heads/master 697cc9610 -> af34e5bd0


[FLINK-6162] [core] Fix bug in ByteArrayOutputStreamWithPos

Fix off-by-one error in 'setPosition' and expand array when seeking
outside the original bounds.

This closes #3595


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af34e5bd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af34e5bd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af34e5bd

Branch: refs/heads/master
Commit: af34e5bd0bc8bf93c78b8bf37033ee660e7913be
Parents: 697cc96
Author: wenlong.lwl 
Authored: Wed Mar 22 09:48:42 2017 +0800
Committer: Greg Hogan 
Committed: Fri Apr 7 20:10:32 2017 -0400

--
 .../memory/ByteArrayInputStreamWithPos.java |   3 +-
 .../memory/ByteArrayOutputStreamWithPos.java|   7 +-
 .../memory/ByteArrayInputStreamWithPosTest.java |  81 ++
 .../ByteArrayOutputStreamWithPosTest.java   | 112 +++
 4 files changed, 197 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/af34e5bd/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
index dd381a4..1447e96 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
@@ -118,7 +118,8 @@ public class ByteArrayInputStreamWithPos extends 
InputStream {
return position;
}
 
-   public void setPos(int pos) {
+   public void setPosition(int pos) {
+   Preconditions.checkArgument(pos >= 0 && pos <= count, "Position 
out of bounds.");
this.position = pos;
}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/af34e5bd/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
index abf65b1..22330c5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
@@ -101,16 +101,13 @@ public class ByteArrayOutputStreamWithPos extends 
OutputStream {
return new String(buffer, 0, count, 
ConfigConstants.DEFAULT_CHARSET);
}
 
-   private int getEndPosition() {
-   return buffer.length;
-   }
-
public int getPosition() {
return count;
}
 
public void setPosition(int position) {
-   Preconditions.checkArgument(position < getEndPosition(), 
"Position out of bounds.");
+   Preconditions.checkArgument(position >= 0, "Position out of 
bounds.");
+   ensureCapacity(position + 1);
count = position;
}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af34e5bd/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java
--
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java
new file mode 100644
index 000..1e1902e
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPosTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 

flink git commit: [FLINK-6279] [table] Fix toString() of TableSourceScan.

2017-04-07 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/master 0038da415 -> 697cc9610


[FLINK-6279] [table] Fix toString() of TableSourceScan.

- The digest of VolcanoRuleMatch matched different table sources with same 
field names as the same.

This closes #3699.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/697cc961
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/697cc961
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/697cc961

Branch: refs/heads/master
Commit: 697cc96106846547ff856aa5e478fee037ffde1a
Parents: 0038da4
Author: godfreyhe 
Authored: Fri Apr 7 21:41:29 2017 +0800
Committer: Fabian Hueske 
Committed: Fri Apr 7 21:42:52 2017 +0200

--
 .../table/plan/nodes/TableSourceScan.scala  | 10 ++-
 .../apache/flink/table/TableSourceTest.scala| 29 
 2 files changed, 38 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/697cc961/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala
index e0f7786..7bd5c5b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/TableSourceScan.scala
@@ -55,7 +55,15 @@ abstract class TableSourceScan(
   }
 
   override def toString: String = {
-s"Source(from: (${getRowType.getFieldNames.asScala.toList.mkString(", 
")}))"
+val tableName = getTable.getQualifiedName
+val s = s"table:$tableName, 
fields:(${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
+
+val sourceDesc = tableSource.explainSource()
+if (sourceDesc.nonEmpty) {
+  s"Scan($s, source:$sourceDesc)"
+} else {
+  s"Scan($s)"
+}
   }
 
   def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): TableSourceScan

http://git-wip-us.apache.org/repos/asf/flink/blob/697cc961/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
index 1866e3c..24a32de 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala
@@ -31,6 +31,35 @@ class TableSourceTest extends TableTestBase {
   private val projectedFields: Array[String] = Array("last", "id", "score")
   private val noCalcFields: Array[String] = Array("id", "score", "first")
 
+  @Test
+  def testTableSourceScanToString(): Unit = {
+val (tableSource1, _) = filterableTableSource
+val (tableSource2, _) = filterableTableSource
+val util = batchTestUtil()
+val tEnv = util.tEnv
+
+tEnv.registerTableSource("table1", tableSource1)
+tEnv.registerTableSource("table2", tableSource2)
+
+val table1 = tEnv.scan("table1").where("amount > 2")
+val table2 = tEnv.scan("table2").where("amount > 2")
+val result = table1.unionAll(table2)
+
+val expected = binaryNode(
+  "DataSetUnion",
+  batchFilterableSourceTableNode(
+"table1",
+Array("name", "id", "amount", "price"),
+"'amount > 2"),
+  batchFilterableSourceTableNode(
+"table2",
+Array("name", "id", "amount", "price"),
+"'amount > 2"),
+  term("union", "name, id, amount, price")
+)
+util.verifyTable(result, expected)
+  }
+
   // batch plan
 
   @Test



[2/2] flink git commit: [FLINK-5545] [table] Remove FlinkAggregateExpandDistinctAggregatesRule after bumping Calcite to v1.12.

2017-04-07 Thread fhueske
[FLINK-5545] [table] Remove FlinkAggregateExpandDistinctAggregatesRule after 
bumping Calcite to v1.12.

This closes #3695.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0038da41
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0038da41
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0038da41

Branch: refs/heads/master
Commit: 0038da41553908a427dd20be75838cccb48c6bcf
Parents: fa7907a
Author: Kurt Young 
Authored: Fri Apr 7 17:46:06 2017 +0800
Committer: Fabian Hueske 
Committed: Fri Apr 7 14:09:56 2017 +0200

--
 ...nkAggregateExpandDistinctAggregatesRule.java | 1158 --
 .../flink/table/plan/rules/FlinkRuleSets.scala  |8 +-
 2 files changed, 3 insertions(+), 1163 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/0038da41/flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateExpandDistinctAggregatesRule.java
--
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateExpandDistinctAggregatesRule.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateExpandDistinctAggregatesRule.java
deleted file mode 100644
index 9d4e08e..000
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/rules/FlinkAggregateExpandDistinctAggregatesRule.java
+++ /dev/null
@@ -1,1158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.calcite.rules;
-
-import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rel.core.RelFactories;
-import org.apache.calcite.rel.logical.LogicalAggregate;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.SqlAggFunction;
-import org.apache.calcite.sql.fun.SqlCountAggFunction;
-import org.apache.calcite.sql.fun.SqlMinMaxAggFunction;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.fun.SqlSumAggFunction;
-import org.apache.calcite.sql.fun.SqlSumEmptyIsZeroAggFunction;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.tools.RelBuilder;
-import org.apache.calcite.tools.RelBuilderFactory;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.calcite.util.Pair;
-import org.apache.calcite.util.Util;
-
-import org.apache.flink.util.Preconditions;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-/**
- Copy calcite's AggregateExpandDistinctAggregatesRule to Flink project,
- and do a quick fix to avoid some bad case mentioned in CALCITE-1558.
- Should drop it and use calcite's AggregateExpandDistinctAggregatesRule
- when we upgrade to calcite 1.12(above)
- */
-
-/**
- * Planner rule that expands distinct aggregates
- * (such as {@code COUNT(DISTINCT x)}) from a
- * {@link org.apache.calcite.rel.logical.LogicalAggregate}.
- *
- * How this is done depends upon the arguments to the function. If all
- * functions have the same argument
- * (e.g. {@code 

[1/2] flink git commit: [FLINK-6012] [table] Support SQL WindowStart and WindowEnd functions.

2017-04-07 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/master 635394751 -> 0038da415


[FLINK-6012] [table] Support SQL WindowStart and WindowEnd functions.

This closes #3693.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa7907ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa7907ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa7907ab

Branch: refs/heads/master
Commit: fa7907ab0e90d182b6386802c97f9b4e001dc440
Parents: 6353947
Author: Haohui Mai 
Authored: Fri Apr 7 01:08:12 2017 -0700
Committer: Fabian Hueske 
Committed: Fri Apr 7 14:07:05 2017 +0200

--
 docs/dev/table_api.md   |  67 +-
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   5 +-
 .../common/WindowStartEndPropertiesRule.scala   | 122 +++
 .../flink/table/validate/FunctionCatalog.scala  |   8 +-
 .../scala/batch/sql/WindowAggregateTest.scala   |  38 --
 .../scala/stream/sql/WindowAggregateTest.scala  |  42 ---
 6 files changed, 252 insertions(+), 30 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/fa7907ab/docs/dev/table_api.md
--
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 6f96920..2a838c7 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1439,7 +1439,7 @@ Group windows are defined in the `GROUP BY` clause of a 
SQL query. Just like que
   
 
   TUMBLE(time_attr, interval)
-  Defines are tumbling time window. A tumbling time window assigns 
rows to non-overlapping, continuous windows with a fixed duration 
(interval). For example, a tumbling window of 5 minutes groups 
rows in 5 minutes intervals. Tumbling windows can be defined on event-time 
(stream + batch) or processing-time (stream).
+  Defines a tumbling time window. A tumbling time window assigns rows 
to non-overlapping, continuous windows with a fixed duration 
(interval). For example, a tumbling window of 5 minutes groups 
rows in 5 minutes intervals. Tumbling windows can be defined on event-time 
(stream + batch) or processing-time (stream).
 
 
   HOP(time_attr, interval, interval)
@@ -1454,6 +1454,40 @@ Group windows are defined in the `GROUP BY` clause of a 
SQL query. Just like que
 
 For SQL queries on streaming tables, the `time_attr` argument of the group 
window function must be one of the `rowtime()` or `proctime()` time-indicators, 
which distinguish between event or processing time, respectively. For SQL on 
batch tables, the `time_attr` argument of the group window function must be an 
attribute of type `TIMESTAMP`. 
 
+ Selecting Group Window Start and End Timestamps
+
+The start and end timestamps of group windows can be selected with the 
following auxiliary functions:
+
+
+  
+
+  Auxiliary Function
+  Description
+
+  
+
+  
+
+  
+TUMBLE_START(time_attr, interval)
+HOP_START(time_attr, interval, interval)
+SESSION_START(time_attr, interval)
+  
+  Returns the start timestamp of the corresponding tumbling, hopping, 
and session window.
+
+
+  
+TUMBLE_END(time_attr, interval)
+HOP_END(time_attr, interval, interval)
+SESSION_END(time_attr, interval)
+  
+  Returns the end timestamp of the corresponding tumbling, hopping, 
and session window.
+
+  
+
+
+Note that the auxiliary functions must be called with exactly same arguments 
as the group window function in the `GROUP BY` clause.
+
 The following examples show how to specify SQL queries with group windows on 
streaming tables. 
 
 
@@ -1469,7 +1503,10 @@ tableEnv.registerDataStream("Orders", ds, "user, 
product, amount");
 
 // compute SUM(amount) per day (in event-time)
 Table result1 = tableEnv.sql(
-  "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime(), INTERVAL 
'1' DAY), user");
+  "SELECT user, " +
+  "  TUMBLE_START(rowtime(), INTERVAL '1' DAY) as wStart,  " +
+  "  SUM(amount) FROM Orders " + 
+  "GROUP BY TUMBLE(rowtime(), INTERVAL '1' DAY), user");
 
 // compute SUM(amount) per day (in processing-time)
 Table result2 = tableEnv.sql(
@@ -1481,7 +1518,12 @@ Table result3 = tableEnv.sql(
 
 // compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
 Table result4 = tableEnv.sql(
-  "SELECT user, SUM(amount) FROM Orders GROUP BY SESSION(rowtime(), INTERVAL 
'12' HOUR), user");
+  "SELECT user, " +
+  "  SESSION_START(rowtime(), INTERVAL '12' HOUR) AS sStart, " +
+  "  SESSION_END(rowtime(), INTERVAL '12' HOUR) AS snd, " + 
+  "  SUM(amount) " + 
+  "FROM Orders " + 
+  "GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user");
 
 {% endhighlight %}
 
@@ -1498,7 +1540,14 @@