This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 578a709  [FLINK-15577][table-planner] Add Window specs to 
WindowAggregate nodes' digests
578a709 is described below

commit 578a70901230e83351287ffa6b73b27f5a16d8ad
Author: Benoit Hanotte <b.hano...@criteo.com>
AuthorDate: Mon Jan 13 10:48:25 2020 +0100

    [FLINK-15577][table-planner] Add Window specs to WindowAggregate nodes' 
digests
    
    The RelNode's digest is used by the Calcite HepPlanner to avoid adding
    duplicate vertices to the graph. If an equivalent vertex was already
    present in the graph, then that vertex is used in place of the newly
    generated one.
    This means that the digest needs to contain all the information
    necessary to identifying a vertex and distinguishing it from similar
    - but not equivalent - vertices.
    
    In the case of the `WindowAggregation` nodes, the window specs are
    currently not in the digest, meaning that two aggregations with the same
    signatures and expressions but different windows are considered
    equivalent by the planner, which is not correct and will lead to an
    invalid Physical Plan.
    
    This commit fixes this issue and adds a test ensuring that the window
    specs are in the digest, as well as similar aggregations on two
    different windows will not be considered equivalent.
    
    This closes #10854
    
    (cherry picked from commit 244718553742c086eefc95f927d7b26af597d40a)
---
 .../plan/logical/rel/LogicalWindowAggregate.scala  |  2 +-
 .../logical/rel/LogicalWindowTableAggregate.scala  |  2 +-
 .../logical/FlinkLogicalWindowAggregate.scala      | 10 +++-
 .../logical/FlinkLogicalWindowTableAggregate.scala | 10 +++-
 .../table/api/batch/sql/GroupWindowTest.scala      | 59 ++++++++++++++++++++
 .../table/api/stream/sql/GroupWindowTest.scala     | 64 ++++++++++++++++++++++
 .../table/GroupWindowTableAggregateTest.scala      | 59 ++++++++++++++++++++
 7 files changed, 202 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
index b87afd6..ee456c4 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
@@ -49,7 +49,7 @@ class LogicalWindowAggregate(
     for (property <- namedProperties) {
       pw.item(property.name, property.property)
     }
-    pw
+    pw.item("window", window.toString)
   }
 
   override def copy(
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowTableAggregate.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowTableAggregate.scala
index 02db874..3c722f5 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowTableAggregate.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowTableAggregate.scala
@@ -50,7 +50,7 @@ class LogicalWindowTableAggregate(
     for (property <- namedProperties) {
       pw.item(property.name, property.property)
     }
-    pw
+    pw.item("window", window.toString)
   }
 
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
TableAggregate = {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
index 0c289c1..26deb4a 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
@@ -25,7 +25,7 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
 import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{RelNode, RelShuttle}
+import org.apache.calcite.rel.{RelNode, RelShuttle, RelWriter}
 import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.util.ImmutableBitSet
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
@@ -52,6 +52,14 @@ class FlinkLogicalWindowAggregate(
 
   def getNamedProperties: Seq[NamedWindowProperty] = namedProperties
 
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw)
+    for (property <- namedProperties) {
+      pw.item(property.name, property.property)
+    }
+    pw.item("window", window.toString)
+  }
+
   override def copy(
       traitSet: RelTraitSet,
       input: RelNode,
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala
index f80cc2d..6ba272f 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala
@@ -25,7 +25,7 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{RelNode, RelShuttle}
+import org.apache.calcite.rel.{RelNode, RelShuttle, RelWriter}
 import org.apache.calcite.util.ImmutableBitSet
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
@@ -50,6 +50,14 @@ class FlinkLogicalWindowTableAggregate(
 
   def getNamedProperties: Seq[NamedWindowProperty] = namedProperties
 
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw)
+    for (property <- namedProperties) {
+      pw.item(property.name, property.property)
+    }
+    pw.item("window", window.toString)
+  }
+
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
TableAggregate = {
     new FlinkLogicalWindowTableAggregate(
       window,
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
index b5091ee..77a5a83 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
@@ -387,4 +387,63 @@ class GroupWindowTest extends TableTestBase {
 
     util.verifySql(sqlQuery, expected)
   }
+
+  @Test
+  def testWindowAggregateWithDifferentWindows() = {
+    // This test ensures that the LogicalWindowAggregate and 
FlinkLogicalWindowAggregate nodes'
+    // digests contain the window specs. This allows the planner to make the 
distinction between
+    // similar aggregations using different windows (see FLINK-15577).
+    val util = batchTestUtil()
+    val table = util.addTable[(Timestamp)]("MyTable", 'rowtime)
+
+    val sql =
+    """
+      |WITH window_1h AS (
+      |    SELECT 1
+      |    FROM MyTable
+      |    GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
+      |),
+      |
+      |window_2h AS (
+      |    SELECT 1
+      |    FROM MyTable
+      |    GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
+      |)
+      |
+      |(SELECT * FROM window_1h)
+      |UNION ALL
+      |(SELECT * FROM window_2h)
+      |""".stripMargin
+
+    val expected =
+      binaryNode(
+        "DataSetUnion",
+        unaryNode(
+          "DataSetCalc",
+          unaryNode(
+            "DataSetWindowAggregate",
+            batchTableNode(table),
+            // This window is the 1hr window
+            term("window", "SlidingGroupWindow('w$, 'rowtime, 3600000.millis, 
3600000.millis)"),
+            term("select")
+          ),
+          term("select", "1 AS EXPR$0")
+        ),
+        unaryNode(
+          "DataSetCalc",
+          unaryNode(
+            "DataSetWindowAggregate",
+            batchTableNode(table),
+            // This window is the 2hr window
+            term("window", "SlidingGroupWindow('w$, 'rowtime, 7200000.millis, 
3600000.millis)"),
+            term("select")
+          ),
+          term("select", "1 AS EXPR$0")
+        ),
+        term("all", "true"),
+        term("union", "EXPR$0")
+      )
+
+    util.verifySql(sql, expected)
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
index 5acef08..c7c4aeb 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
@@ -344,4 +344,68 @@ class GroupWindowTest extends TableTestBase {
       )
     streamUtil.verifySql(sql, expected)
   }
+
+  @Test
+  def testWindowAggregateWithDifferentWindows() = {
+    // This test ensures that the LogicalWindowAggregate and 
FlinkLogicalWindowAggregate nodes'
+    // digests contain the window specs. This allows the planner to make the 
distinction between
+    // similar aggregations using different windows (see FLINK-15577).
+    val sql =
+      """
+        |WITH window_1h AS (
+        |    SELECT 1
+        |    FROM MyTable
+        |    GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
+        |),
+        |
+        |window_2h AS (
+        |    SELECT 1
+        |    FROM MyTable
+        |    GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
+        |)
+        |
+        |(SELECT * FROM window_1h)
+        |UNION ALL
+        |(SELECT * FROM window_2h)
+        |""".stripMargin
+
+    val expected =
+      binaryNode(
+        "DataStreamUnion",
+        unaryNode(
+          "DataStreamCalc",
+          unaryNode(
+            "DataStreamGroupWindowAggregate",
+            unaryNode(
+              "DataStreamCalc",
+              streamTableNode(table),
+              term("select", "rowtime")
+            ),
+            // This window is the 1hr window
+            term("window", "SlidingGroupWindow('w$, 'rowtime, 3600000.millis, 
3600000.millis)"),
+            term("select")
+          ),
+          term("select", "1 AS EXPR$0")
+        ),
+        unaryNode(
+          "DataStreamCalc",
+          unaryNode(
+            "DataStreamGroupWindowAggregate",
+            unaryNode(
+              "DataStreamCalc",
+              streamTableNode(table),
+              term("select", "rowtime")
+            ),
+            // This window is the 2hr window
+            term("window", "SlidingGroupWindow('w$, 'rowtime, 7200000.millis, 
3600000.millis)"),
+            term("select")
+          ),
+          term("select", "1 AS EXPR$0")
+        ),
+        term("all", "true"),
+        term("union all", "EXPR$0")
+      )
+
+    streamUtil.verifySql(sql, expected)
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTableAggregateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTableAggregateTest.scala
index d5f59ee..db869ae 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTableAggregateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTableAggregateTest.scala
@@ -549,4 +549,63 @@ class GroupWindowTableAggregateTest extends TableTestBase {
 
     util.verifyTable(windowedTable, expected)
   }
+
+  @Test
+  def testWindowAggregateWithDifferentWindows(): Unit = {
+    // This test ensures that the LogicalWindowTableAggregate and 
FlinkLogicalWindowTableAggregate
+    // nodes'v digests contain the window specs. This allows the planner to 
make the distinction
+    // between similar aggregations using different windows (see FLINK-15577).
+    val tableWindow1hr = table
+      .window(Slide over 1.hour every 1.hour on 'd as 'w1)
+      .groupBy('w1)
+      .flatAggregate(emptyFunc('a, 'b))
+      .select(1 as 'a)
+
+    val tableWindow2hr = table
+      .window(Slide over 2.hour every 1.hour on 'd as 'w1)
+      .groupBy('w1)
+      .flatAggregate(emptyFunc('a, 'b))
+      .select(1 as 'b)
+
+    val joinTable = tableWindow1hr.fullOuterJoin(tableWindow2hr, 'a === 'b)
+
+    val expected =
+      binaryNode(
+        "DataStreamJoin",
+        unaryNode(
+          "DataStreamCalc",
+          unaryNode(
+            "DataStreamGroupWindowTableAggregate",
+            unaryNode(
+              "DataStreamCalc",
+              streamTableNode(table),
+              term("select", "a", "b", "d")
+            ),
+            // This window is the 1hr window
+            term("window", "SlidingGroupWindow('w1, 'd, 3600000.millis, 
3600000.millis)"),
+            term("select", "EmptyTableAggFunc(a, b) AS (f0, f1)")
+          ),
+          term("select", "1 AS a")
+        ),
+        unaryNode(
+          "DataStreamCalc",
+          unaryNode(
+            "DataStreamGroupWindowTableAggregate",
+            unaryNode(
+              "DataStreamCalc",
+              streamTableNode(table),
+              term("select", "a", "b", "d")
+            ),
+            // This window is the 2hr window
+            term("window", "SlidingGroupWindow('w1, 'd, 7200000.millis, 
3600000.millis)"),
+            term("select", "EmptyTableAggFunc(a, b) AS (f0, f1)")
+          ),
+          term("select", "1 AS b")
+        ),
+        term("where", "=(a, b)"),
+        term("join", "a", "b"),
+        term("joinType", "FullOuterJoin")
+      )
+    util.verifyTable(joinTable, expected)
+  }
 }

Reply via email to