This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d1b22b56a92eef26998555dc35371447593afc8c
Author: JingsongLi <lzljs3620...@aliyun.com>
AuthorDate: Tue Jul 2 09:56:38 2019 +0800

    [FLINK-13049][table-planner-blink] Rename windowProperties and 
PlannerResolvedFieldReference to avoid name conflit
---
 .../flink/table/calcite/FlinkRelBuilder.scala      |  4 ++--
 .../codegen/agg/AggsHandlerCodeGenerator.scala     | 14 ++++++------
 .../agg/batch/HashWindowCodeGenerator.scala        |  4 ++--
 .../agg/batch/SortWindowCodeGenerator.scala        |  4 ++--
 .../codegen/agg/batch/WindowCodeGenerator.scala    |  4 ++--
 ...nce.scala => ExestingFieldFieldReference.scala} |  2 +-
 ...perties.scala => plannerWindowProperties.scala} | 24 ++++++++++++---------
 .../flink/table/plan/logical/groupWindows.scala    |  8 +++----
 .../plan/metadata/FlinkRelMdColumnUniqueness.scala |  4 ++--
 .../plan/metadata/FlinkRelMdUniqueGroups.scala     |  4 ++--
 .../table/plan/metadata/FlinkRelMdUniqueKeys.scala |  4 ++--
 .../nodes/calcite/LogicalWindowAggregate.scala     |  8 +++----
 .../table/plan/nodes/calcite/WindowAggregate.scala |  6 +++---
 .../logical/FlinkLogicalWindowAggregate.scala      |  4 ++--
 .../batch/BatchExecHashWindowAggregate.scala       |  4 ++--
 .../batch/BatchExecHashWindowAggregateBase.scala   |  4 ++--
 .../batch/BatchExecLocalHashWindowAggregate.scala  |  4 ++--
 .../batch/BatchExecLocalSortWindowAggregate.scala  |  4 ++--
 .../batch/BatchExecSortWindowAggregate.scala       |  4 ++--
 .../batch/BatchExecSortWindowAggregateBase.scala   |  4 ++--
 .../batch/BatchExecWindowAggregateBase.scala       |  6 +++---
 .../stream/StreamExecGroupWindowAggregate.scala    |  6 +++---
 .../logical/LogicalWindowAggregateRuleBase.scala   |  8 +++----
 .../plan/rules/logical/WindowPropertiesRule.scala  | 20 ++++++++++-------
 .../flink/table/plan/util/AggregateUtil.scala      | 20 ++++++++---------
 .../flink/table/plan/util/FlinkRelMdUtil.scala     |  4 ++--
 .../flink/table/plan/util/RelExplainUtil.scala     |  4 ++--
 .../flink/table/sources/TableSourceUtil.scala      | 25 +++++++++++-----------
 .../table/sources/tsextractors/ExistingField.scala |  4 ++--
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  | 16 +++++++-------
 30 files changed, 120 insertions(+), 111 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
index 4683424..8b5cf8a 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.calcite
 
 import org.apache.flink.table.calcite.FlinkRelFactories.{ExpandFactory, 
RankFactory, SinkFactory}
-import org.apache.flink.table.expressions.WindowProperty
+import org.apache.flink.table.expressions.PlannerWindowProperty
 import org.apache.flink.table.operations.QueryOperation
 import org.apache.flink.table.plan.QueryOperationConverter
 import org.apache.flink.table.runtime.rank.{RankRange, RankType}
@@ -111,7 +111,7 @@ object FlinkRelBuilder {
     *
     * Similar to [[RelBuilder.AggCall]] or [[RelBuilder.GroupKey]].
     */
-  case class NamedWindowProperty(name: String, property: WindowProperty)
+  case class PlannerNamedWindowProperty(name: String, property: 
PlannerWindowProperty)
 
   def proto(context: Context): RelBuilderFactory = new RelBuilderFactory() {
     def create(cluster: RelOptCluster, schema: RelOptSchema): RelBuilder =
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala
index f77810b..bf40b04 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala
@@ -58,7 +58,7 @@ class AggsHandlerCodeGenerator(
 
   /** window properties like window_start and window_end, only used in window 
aggregates */
   private var namespaceClassName: String = _
-  private var windowProperties: Seq[WindowProperty] = Seq()
+  private var windowProperties: Seq[PlannerWindowProperty] = Seq()
   private var hasNamespace: Boolean = false
 
   /** Aggregates informations */
@@ -182,7 +182,7 @@ class AggsHandlerCodeGenerator(
     * Adds window properties such as window_start, window_end
     */
   private def initialWindowProperties(
-      windowProperties: Seq[WindowProperty],
+      windowProperties: Seq[PlannerWindowProperty],
       windowClass: Class[_]): Unit = {
     this.windowProperties = windowProperties
     this.namespaceClassName = windowClass.getCanonicalName
@@ -404,7 +404,7 @@ class AggsHandlerCodeGenerator(
   def generateNamespaceAggsHandler[N](
       name: String,
       aggInfoList: AggregateInfoList,
-      windowProperties: Seq[WindowProperty],
+      windowProperties: Seq[PlannerWindowProperty],
       windowClass: Class[N]): GeneratedNamespaceAggsHandleFunction[N] = {
 
     initialWindowProperties(windowProperties, windowClass)
@@ -663,19 +663,19 @@ class AggsHandlerCodeGenerator(
     if (hasNamespace) {
       // append window property results
       val windowExprs = windowProperties.map {
-        case w: WindowStart =>
+        case w: PlannerWindowStart =>
           // return a Timestamp(Internal is long)
           GeneratedExpression(
             s"$NAMESPACE_TERM.getStart()", "false", "", w.resultType)
-        case w: WindowEnd =>
+        case w: PlannerWindowEnd =>
           // return a Timestamp(Internal is long)
           GeneratedExpression(
             s"$NAMESPACE_TERM.getEnd()", "false", "", w.resultType)
-        case r: RowtimeAttribute =>
+        case r: PlannerRowtimeAttribute =>
           // return a rowtime, use long as internal type
           GeneratedExpression(
             s"$NAMESPACE_TERM.getEnd() - 1", "false", "", r.resultType)
-        case p: ProctimeAttribute =>
+        case p: PlannerProctimeAttribute =>
           // ignore this property, it will be null at the position later
           GeneratedExpression("-1L", "true", "", p.resultType)
       }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/HashWindowCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/HashWindowCodeGenerator.scala
index 1612c77..21f7200 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/HashWindowCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/HashWindowCodeGenerator.scala
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.operators.sort.QuickSort
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.window.TimeWindow
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.codegen.CodeGenUtils.{BINARY_ROW, newName}
 import org.apache.flink.table.codegen.OperatorCodeGenerator.generateCollect
 import 
org.apache.flink.table.codegen.agg.batch.AggCodeGenHelper.genGroupKeyChangedCheckCode
@@ -68,7 +68,7 @@ class HashWindowCodeGenerator(
     window: LogicalWindow,
     inputTimeFieldIndex: Int,
     inputTimeIsDate: Boolean,
-    namedProperties: Seq[NamedWindowProperty],
+    namedProperties: Seq[PlannerNamedWindowProperty],
     aggInfoList: AggregateInfoList,
     inputRowType: RelDataType,
     grouping: Array[Int],
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/SortWindowCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/SortWindowCodeGenerator.scala
index c99e850..c97a2ef 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/SortWindowCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/SortWindowCodeGenerator.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.codegen.agg.batch
 
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator
 import org.apache.flink.table.api.window.TimeWindow
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.codegen.CodeGenUtils.BINARY_ROW
 import 
org.apache.flink.table.codegen.agg.batch.AggCodeGenHelper.genGroupKeyChangedCheckCode
 import org.apache.flink.table.codegen.{CodeGenUtils, CodeGeneratorContext, 
ProjectionCodeGenerator}
@@ -60,7 +60,7 @@ class SortWindowCodeGenerator(
     window: LogicalWindow,
     inputTimeFieldIndex: Int,
     inputTimeIsDate: Boolean,
-    namedProperties: Seq[NamedWindowProperty],
+    namedProperties: Seq[PlannerNamedWindowProperty],
     aggInfoList: AggregateInfoList,
     inputRowType: RelDataType,
     inputType: RowType,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/WindowCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/WindowCodeGenerator.scala
index df2549f..060036f 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/WindowCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/WindowCodeGenerator.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.codegen.agg.batch
 import org.apache.flink.table.JLong
 import org.apache.flink.table.api.DataTypes
 import org.apache.flink.table.api.window.TimeWindow
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenUtils.{BINARY_ROW, 
boxedTypeTermForType, newName}
 import org.apache.flink.table.codegen.GenerateUtils.generateFieldAccess
@@ -59,7 +59,7 @@ abstract class WindowCodeGenerator(
     window: LogicalWindow,
     inputTimeFieldIndex: Int,
     inputTimeIsDate: Boolean,
-    namedProperties: Seq[NamedWindowProperty],
+    namedProperties: Seq[PlannerNamedWindowProperty],
     aggInfoList: AggregateInfoList,
     inputRowType: RelDataType,
     grouping: Array[Int],
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerResolvedFieldReference.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExestingFieldFieldReference.scala
similarity index 96%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerResolvedFieldReference.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExestingFieldFieldReference.scala
index a3406f8..0ad1f5e 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerResolvedFieldReference.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExestingFieldFieldReference.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.expressions
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 
-case class PlannerResolvedFieldReference(
+case class ExestingFieldFieldReference(
     name: String,
     resultType: TypeInformation[_],
     fieldIndex: Int) extends ResolvedFieldReference
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/plannerWindowProperties.scala
similarity index 73%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/plannerWindowProperties.scala
index 0adf024..c851a6a 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/plannerWindowProperties.scala
@@ -21,45 +21,49 @@ package org.apache.flink.table.expressions
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.types.logical.{BigIntType, LogicalType, 
TimestampKind, TimestampType}
 
-trait WindowProperty {
+trait PlannerWindowProperty {
   def resultType: LogicalType
 }
 
-abstract class AbstractWindowProperty(reference: WindowReference) extends 
WindowProperty {
+abstract class AbstractPlannerWindowProperty(
+    reference: PlannerWindowReference) extends PlannerWindowProperty {
   override def toString = s"WindowProperty($reference)"
 }
 
 /**
   * Indicate timeField type.
   */
-case class WindowReference(name: String, tpe: Option[LogicalType] = None) {
+case class PlannerWindowReference(name: String, tpe: Option[LogicalType] = 
None) {
   override def toString: String = s"'$name"
 }
 
-case class WindowStart(reference: WindowReference) extends 
AbstractWindowProperty(reference) {
+case class PlannerWindowStart(
+    reference: PlannerWindowReference) extends 
AbstractPlannerWindowProperty(reference) {
 
   override def resultType: TimestampType = new TimestampType(3)
 
   override def toString: String = s"start($reference)"
 }
 
-case class WindowEnd(reference: WindowReference) extends 
AbstractWindowProperty(reference) {
+case class PlannerWindowEnd(
+    reference: PlannerWindowReference) extends 
AbstractPlannerWindowProperty(reference) {
 
   override def resultType: TimestampType = new TimestampType(3)
 
   override def toString: String = s"end($reference)"
 }
 
-case class RowtimeAttribute(reference: WindowReference) extends 
AbstractWindowProperty(reference) {
+case class PlannerRowtimeAttribute(
+    reference: PlannerWindowReference) extends 
AbstractPlannerWindowProperty(reference) {
 
   override def resultType: LogicalType = {
     reference match {
-      case WindowReference(_, Some(tpe))
+      case PlannerWindowReference(_, Some(tpe))
         if tpe.isInstanceOf[TimestampType] &&
             tpe.asInstanceOf[TimestampType].getKind == TimestampKind.ROWTIME =>
         // rowtime window
         new TimestampType(true, TimestampKind.ROWTIME, 3)
-      case WindowReference(_, Some(tpe))
+      case PlannerWindowReference(_, Some(tpe))
         if tpe.isInstanceOf[BigIntType] || tpe.isInstanceOf[TimestampType] =>
         // batch time window
         new TimestampType(3)
@@ -72,8 +76,8 @@ case class RowtimeAttribute(reference: WindowReference) 
extends AbstractWindowPr
   override def toString: String = s"rowtime($reference)"
 }
 
-case class ProctimeAttribute(reference: WindowReference)
-  extends AbstractWindowProperty(reference) {
+case class PlannerProctimeAttribute(reference: PlannerWindowReference)
+  extends AbstractPlannerWindowProperty(reference) {
 
   override def resultType: LogicalType =
     new TimestampType(true, TimestampKind.PROCTIME, 3)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
index d9638f2..f9f01c4 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
@@ -27,7 +27,7 @@ import org.apache.flink.table.expressions._
   * @param timeAttribute time field indicating event-time or processing-time
   */
 abstract class LogicalWindow(
-    val aliasAttribute: WindowReference,
+    val aliasAttribute: PlannerWindowReference,
     val timeAttribute: FieldReferenceExpression) {
 
   override def toString: String = getClass.getSimpleName
@@ -38,7 +38,7 @@ abstract class LogicalWindow(
 // 
------------------------------------------------------------------------------------------------
 
 case class TumblingGroupWindow(
-    alias: WindowReference,
+    alias: PlannerWindowReference,
     timeField: FieldReferenceExpression,
     size: ValueLiteralExpression)
   extends LogicalWindow(
@@ -51,7 +51,7 @@ case class TumblingGroupWindow(
 // 
------------------------------------------------------------------------------------------------
 
 case class SlidingGroupWindow(
-    alias: WindowReference,
+    alias: PlannerWindowReference,
     timeField: FieldReferenceExpression,
     size: ValueLiteralExpression,
     slide: ValueLiteralExpression)
@@ -67,7 +67,7 @@ case class SlidingGroupWindow(
 // 
------------------------------------------------------------------------------------------------
 
 case class SessionGroupWindow(
-    alias: WindowReference,
+    alias: PlannerWindowReference,
     timeField: FieldReferenceExpression,
     gap: ValueLiteralExpression)
   extends LogicalWindow(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
index 0afaa62..7993d67 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.metadata
 
 import org.apache.flink.table.JBoolean
 import org.apache.flink.table.api.TableException
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.plan.nodes.FlinkRelNode
 import org.apache.flink.table.plan.nodes.calcite.{Expand, Rank, 
WindowAggregate}
 import org.apache.flink.table.plan.nodes.common.CommonLookupJoin
@@ -402,7 +402,7 @@ class FlinkRelMdColumnUniqueness private extends 
MetadataHandler[BuiltInMetadata
 
   private def areColumnsUniqueOnWindowAggregate(
       grouping: Array[Int],
-      namedProperties: Seq[NamedWindowProperty],
+      namedProperties: Seq[PlannerNamedWindowProperty],
       outputFieldCount: Int,
       mq: RelMetadataQuery,
       columns: ImmutableBitSet,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala
index 62ab472..f743dd5 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.plan.metadata
 
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.plan.metadata.FlinkMetadata.UniqueGroups
 import org.apache.flink.table.plan.nodes.calcite.{Expand, Rank, 
WindowAggregate}
 import org.apache.flink.table.plan.nodes.physical.batch._
@@ -276,7 +276,7 @@ class FlinkRelMdUniqueGroups private extends 
MetadataHandler[UniqueGroups] {
       windowAgg: SingleRel,
       grouping: Array[Int],
       auxGrouping: Array[Int],
-      namedProperties: Seq[NamedWindowProperty],
+      namedProperties: Seq[PlannerNamedWindowProperty],
       mq: RelMetadataQuery,
       columns: ImmutableBitSet): ImmutableBitSet = {
     val fieldCount = windowAgg.getRowType.getFieldCount
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala
index 84d66cd..89ba591 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.plan.metadata
 
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.plan.nodes.calcite.{Expand, Rank, 
WindowAggregate}
 import org.apache.flink.table.plan.nodes.common.CommonLookupJoin
 import org.apache.flink.table.plan.nodes.logical._
@@ -342,7 +342,7 @@ class FlinkRelMdUniqueKeys private extends 
MetadataHandler[BuiltInMetadata.Uniqu
 
   private def getUniqueKeysOnWindowAgg(
       fieldCount: Int,
-      namedProperties: Seq[NamedWindowProperty],
+      namedProperties: Seq[PlannerNamedWindowProperty],
       grouping: Array[Int],
       mq: RelMetadataQuery,
       ignoreNulls: Boolean): util.Set[ImmutableBitSet] = {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWindowAggregate.scala
index 32803f6..be8288e 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWindowAggregate.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.plan.nodes.calcite
 
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.plan.logical.LogicalWindow
 
 import org.apache.calcite.plan.{Convention, RelOptCluster, RelTraitSet}
@@ -36,7 +36,7 @@ final class LogicalWindowAggregate(
     groupSet: ImmutableBitSet,
     aggCalls: util.List[AggregateCall],
     window: LogicalWindow,
-    namedProperties: Seq[NamedWindowProperty])
+    namedProperties: Seq[PlannerNamedWindowProperty])
   extends WindowAggregate(cluster, traitSet, child, groupSet, aggCalls, 
window, namedProperties) {
 
   override def copy(
@@ -56,7 +56,7 @@ final class LogicalWindowAggregate(
       namedProperties)
   }
 
-  def copy(namedProperties: Seq[NamedWindowProperty]): LogicalWindowAggregate 
= {
+  def copy(namedProperties: Seq[PlannerNamedWindowProperty]): 
LogicalWindowAggregate = {
     new LogicalWindowAggregate(
       cluster,
       traitSet,
@@ -72,7 +72,7 @@ object LogicalWindowAggregate {
 
   def create(
       window: LogicalWindow,
-      namedProperties: Seq[NamedWindowProperty],
+      namedProperties: Seq[PlannerNamedWindowProperty],
       agg: Aggregate): LogicalWindowAggregate = {
     require(!agg.indicator && (agg.getGroupType == Group.SIMPLE))
     val cluster: RelOptCluster = agg.getCluster
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WindowAggregate.scala
index efe2b8a..e206b65 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WindowAggregate.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.plan.nodes.calcite
 
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.logical.LogicalWindow
 
@@ -43,7 +43,7 @@ abstract class WindowAggregate(
     groupSet: ImmutableBitSet,
     aggCalls: util.List[AggregateCall],
     window: LogicalWindow,
-    namedProperties: Seq[NamedWindowProperty])
+    namedProperties: Seq[PlannerNamedWindowProperty])
   extends Aggregate(
     cluster,
     traitSet,
@@ -55,7 +55,7 @@ abstract class WindowAggregate(
 
   def getWindow: LogicalWindow = window
 
-  def getNamedProperties: Seq[NamedWindowProperty] = namedProperties
+  def getNamedProperties: Seq[PlannerNamedWindowProperty] = namedProperties
 
   override def accept(shuttle: RelShuttle): RelNode = shuttle.visit(this)
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
index 10068dc..ebea49c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.plan.nodes.logical
 
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.plan.logical.LogicalWindow
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.calcite.{LogicalWindowAggregate, 
WindowAggregate}
@@ -43,7 +43,7 @@ class FlinkLogicalWindowAggregate(
     groupSet: ImmutableBitSet,
     aggCalls: util.List[AggregateCall],
     window: LogicalWindow,
-    namedProperties: Seq[NamedWindowProperty])
+    namedProperties: Seq[PlannerNamedWindowProperty])
   extends WindowAggregate(cluster, traitSet, child, groupSet, aggCalls, 
window, namedProperties)
   with FlinkLogicalRel {
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala
index c7eb08b..e42fb81 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.plan.nodes.physical.batch
 
 import org.apache.flink.runtime.operators.DamBehavior
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.functions.UserDefinedFunction
 import org.apache.flink.table.plan.logical.LogicalWindow
 
@@ -45,7 +45,7 @@ class BatchExecHashWindowAggregate(
     window: LogicalWindow,
     inputTimeFieldIndex: Int,
     inputTimeIsDate: Boolean,
-    namedProperties: Seq[NamedWindowProperty],
+    namedProperties: Seq[PlannerNamedWindowProperty],
     enableAssignPane: Boolean = false,
     isMerge: Boolean)
   extends BatchExecHashWindowAggregateBase(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
index 4ca99f6..59e2d3f 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.nodes.physical.batch
 
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.table.api.{BatchTableEnvironment, TableConfigOptions}
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGeneratorContext
 import org.apache.flink.table.codegen.agg.batch.{HashWindowCodeGenerator, 
WindowCodeGenerator}
@@ -61,7 +61,7 @@ abstract class BatchExecHashWindowAggregateBase(
     window: LogicalWindow,
     inputTimeFieldIndex: Int,
     inputTimeIsDate: Boolean,
-    namedProperties: Seq[NamedWindowProperty],
+    namedProperties: Seq[PlannerNamedWindowProperty],
     enableAssignPane: Boolean = false,
     isMerge: Boolean,
     isFinal: Boolean)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala
index 34752c8..eea8eb2 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.plan.nodes.physical.batch
 
 import org.apache.flink.runtime.operators.DamBehavior
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.functions.UserDefinedFunction
 import org.apache.flink.table.plan.logical.LogicalWindow
 
@@ -44,7 +44,7 @@ class BatchExecLocalHashWindowAggregate(
     window: LogicalWindow,
     val inputTimeFieldIndex: Int,
     inputTimeIsDate: Boolean,
-    namedProperties: Seq[NamedWindowProperty],
+    namedProperties: Seq[PlannerNamedWindowProperty],
     enableAssignPane: Boolean = false)
   extends BatchExecHashWindowAggregateBase(
     cluster,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
index d8b68fc..0ebc072 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.plan.nodes.physical.batch
 
 import org.apache.flink.runtime.operators.DamBehavior
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.functions.UserDefinedFunction
 import org.apache.flink.table.plan.logical.LogicalWindow
 
@@ -44,7 +44,7 @@ class BatchExecLocalSortWindowAggregate(
     window: LogicalWindow,
     val inputTimeFieldIndex: Int,
     inputTimeIsDate: Boolean,
-    namedProperties: Seq[NamedWindowProperty],
+    namedProperties: Seq[PlannerNamedWindowProperty],
     enableAssignPane: Boolean = false)
   extends BatchExecSortWindowAggregateBase(
     cluster,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
index 959aa9d..297488e 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.nodes.physical.batch
 
 import org.apache.flink.runtime.operators.DamBehavior
 import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.functions.UserDefinedFunction
 import org.apache.flink.table.plan.logical.LogicalWindow
@@ -45,7 +45,7 @@ class BatchExecSortWindowAggregate(
     window: LogicalWindow,
     inputTimeFieldIndex: Int,
     inputTimeIsDate: Boolean,
-    namedProperties: Seq[NamedWindowProperty],
+    namedProperties: Seq[PlannerNamedWindowProperty],
     enableAssignPane: Boolean = false,
     isMerge: Boolean)
   extends BatchExecSortWindowAggregateBase(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
index 36dabfd..fc282bb 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.nodes.physical.batch
 
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig, 
TableConfigOptions}
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGeneratorContext
 import org.apache.flink.table.codegen.agg.batch.{SortWindowCodeGenerator, 
WindowCodeGenerator}
@@ -58,7 +58,7 @@ abstract class BatchExecSortWindowAggregateBase(
     window: LogicalWindow,
     inputTimeFieldIndex: Int,
     inputTimeIsDate: Boolean,
-    namedProperties: Seq[NamedWindowProperty],
+    namedProperties: Seq[PlannerNamedWindowProperty],
     enableAssignPane: Boolean = false,
     isMerge: Boolean,
     isFinal: Boolean)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecWindowAggregateBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecWindowAggregateBase.scala
index 49ec65e..4982d37 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecWindowAggregateBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecWindowAggregateBase.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.plan.nodes.physical.batch
 
 import org.apache.flink.table.api.TableException
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.functions.UserDefinedFunction
 import org.apache.flink.table.plan.logical.LogicalWindow
 import org.apache.flink.table.plan.util.RelExplainUtil
@@ -39,7 +39,7 @@ abstract class BatchExecWindowAggregateBase(
     auxGrouping: Array[Int],
     aggCallToAggFunction: Seq[(AggregateCall, UserDefinedFunction)],
     window: LogicalWindow,
-    namedProperties: Seq[NamedWindowProperty],
+    namedProperties: Seq[PlannerNamedWindowProperty],
     enableAssignPane: Boolean = true,
     val isMerge: Boolean,
     val isFinal: Boolean)
@@ -56,7 +56,7 @@ abstract class BatchExecWindowAggregateBase(
 
   def getWindow: LogicalWindow = window
 
-  def getNamedProperties: Seq[NamedWindowProperty] = namedProperties
+  def getNamedProperties: Seq[PlannerNamedWindowProperty] = namedProperties
 
   def getAggCallList: Seq[AggregateCall] = aggCallToAggFunction.map(_._1)
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
index 5812780..680bed3 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.nodes.physical.stream
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.table.api.window.{CountWindow, TimeWindow}
 import org.apache.flink.table.api.{StreamTableEnvironment, TableConfig, 
TableException}
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.agg.AggsHandlerCodeGenerator
 import org.apache.flink.table.codegen.{CodeGeneratorContext, 
EqualiserCodeGenerator}
@@ -61,7 +61,7 @@ class StreamExecGroupWindowAggregate(
     grouping: Array[Int],
     val aggCalls: Seq[AggregateCall],
     val window: LogicalWindow,
-    namedProperties: Seq[NamedWindowProperty],
+    namedProperties: Seq[PlannerNamedWindowProperty],
     inputTimeFieldIndex: Int,
     val emitStrategy: WindowEmitStrategy)
   extends SingleRel(cluster, traitSet, inputRel)
@@ -88,7 +88,7 @@ class StreamExecGroupWindowAggregate(
 
   def getGrouping: Array[Int] = grouping
 
-  def getWindowProperties: Seq[NamedWindowProperty] = namedProperties
+  def getWindowProperties: Seq[PlannerNamedWindowProperty] = namedProperties
 
   override def deriveRowType(): RelDataType = outputRowType
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalWindowAggregateRuleBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalWindowAggregateRuleBase.scala
index 557f3c2..905b3f3 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalWindowAggregateRuleBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalWindowAggregateRuleBase.scala
@@ -18,9 +18,9 @@
 package org.apache.flink.table.plan.rules.logical
 
 import org.apache.flink.table.api._
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.intervalOfMillis
-import org.apache.flink.table.expressions.{FieldReferenceExpression, 
WindowReference}
+import org.apache.flink.table.expressions.{FieldReferenceExpression, 
PlannerWindowReference}
 import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
 import org.apache.flink.table.plan.logical.{LogicalWindow, SessionGroupWindow, 
SlidingGroupWindow, TumblingGroupWindow}
 import org.apache.flink.table.plan.nodes.calcite.LogicalWindowAggregate
@@ -105,7 +105,7 @@ abstract class LogicalWindowAggregateRuleBase(description: 
String)
     val transformed = call.builder()
     val windowAgg = LogicalWindowAggregate.create(
       window,
-      Seq[NamedWindowProperty](),
+      Seq[PlannerNamedWindowProperty](),
       newAgg)
     // The transformation adds an additional LogicalProject at the top to 
ensure
     // that the types are equivalent.
@@ -176,7 +176,7 @@ abstract class LogicalWindowAggregateRuleBase(description: 
String)
 
     val timeField = getTimeFieldReference(windowExpr.getOperands.get(0), 
windowExprIdx, rowType)
     val resultType = 
Some(fromDataTypeToLogicalType(timeField.getOutputDataType))
-    val windowRef = WindowReference("w$", resultType)
+    val windowRef = PlannerWindowReference("w$", resultType)
     windowExpr.getOperator match {
       case FlinkSqlOperatorTable.TUMBLE =>
         val interval = getOperandAsLong(windowExpr, 1)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/WindowPropertiesRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/WindowPropertiesRule.scala
index af6878f..c242f3e 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/WindowPropertiesRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/WindowPropertiesRule.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.plan.rules.logical
 
 import org.apache.flink.table.api.{TableException, Types, ValidationException}
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
 import org.apache.flink.table.plan.logical.LogicalWindow
@@ -104,19 +104,23 @@ object WindowPropertiesRules {
     val windowType = getWindowType(w)
 
     val startEndProperties = Seq(
-      NamedWindowProperty(propertyName(w, "start"), 
WindowStart(w.aliasAttribute)),
-      NamedWindowProperty(propertyName(w, "end"), WindowEnd(w.aliasAttribute)))
+      PlannerNamedWindowProperty(propertyName(w, "start"), 
PlannerWindowStart(w.aliasAttribute)),
+      PlannerNamedWindowProperty(propertyName(w, "end"), 
PlannerWindowEnd(w.aliasAttribute)))
 
     // allow rowtime/proctime for rowtime windows and proctime for proctime 
windows
     val timeProperties = windowType match {
       case 'streamRowtime =>
         Seq(
-          NamedWindowProperty(propertyName(w, "rowtime"), 
RowtimeAttribute(w.aliasAttribute)),
-          NamedWindowProperty(propertyName(w, "proctime"), 
ProctimeAttribute(w.aliasAttribute)))
+          PlannerNamedWindowProperty(propertyName(w, "rowtime"),
+            PlannerRowtimeAttribute(w.aliasAttribute)),
+          PlannerNamedWindowProperty(propertyName(w, "proctime"),
+            PlannerProctimeAttribute(w.aliasAttribute)))
       case 'streamProctime =>
-        Seq(NamedWindowProperty(propertyName(w, "proctime"), 
ProctimeAttribute(w.aliasAttribute)))
+        Seq(PlannerNamedWindowProperty(propertyName(w, "proctime"),
+          PlannerProctimeAttribute(w.aliasAttribute)))
       case 'batchRowtime =>
-        Seq(NamedWindowProperty(propertyName(w, "rowtime"), 
RowtimeAttribute(w.aliasAttribute)))
+        Seq(PlannerNamedWindowProperty(propertyName(w, "rowtime"),
+          PlannerRowtimeAttribute(w.aliasAttribute)))
       case _ =>
         throw new TableException("Unknown window type encountered. Please 
report this bug.")
     }
@@ -161,7 +165,7 @@ object WindowPropertiesRules {
 
   /** Generates a property name for a window. */
   private def propertyName(window: LogicalWindow, name: String): String =
-    window.aliasAttribute.asInstanceOf[WindowReference].name + name
+    window.aliasAttribute.asInstanceOf[PlannerWindowReference].name + name
 
   /** Replace group auxiliaries with field references. */
   def replaceGroupAuxiliaries(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
index c325eba..20ebf10 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.util
 import org.apache.flink.api.common.typeinfo.Types
 import org.apache.flink.table.JLong
 import org.apache.flink.table.api.{DataTypes, TableConfig, TableConfigOptions, 
TableException}
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.calcite.{FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.dataformat.BaseRow
 import 
org.apache.flink.table.dataview.DataViewUtils.useNullSerializerForStateViewFieldsFromAccType
@@ -688,27 +688,27 @@ object AggregateUtil extends Enumeration {
     * Computes the positions of (window start, window end, row time).
     */
   private[flink] def computeWindowPropertyPos(
-      properties: Seq[NamedWindowProperty]): (Option[Int], Option[Int], 
Option[Int]) = {
+      properties: Seq[PlannerNamedWindowProperty]): (Option[Int], Option[Int], 
Option[Int]) = {
     val propPos = properties.foldRight(
       (None: Option[Int], None: Option[Int], None: Option[Int], 0)) {
       case (p, (s, e, rt, i)) => p match {
-        case NamedWindowProperty(_, prop) =>
+        case PlannerNamedWindowProperty(_, prop) =>
           prop match {
-            case WindowStart(_) if s.isDefined =>
+            case PlannerWindowStart(_) if s.isDefined =>
               throw new TableException(
                 "Duplicate window start property encountered. This is a bug.")
-            case WindowStart(_) =>
+            case PlannerWindowStart(_) =>
               (Some(i), e, rt, i - 1)
-            case WindowEnd(_) if e.isDefined =>
+            case PlannerWindowEnd(_) if e.isDefined =>
               throw new TableException("Duplicate window end property 
encountered. This is a bug.")
-            case WindowEnd(_) =>
+            case PlannerWindowEnd(_) =>
               (s, Some(i), rt, i - 1)
-            case RowtimeAttribute(_) if rt.isDefined =>
+            case PlannerRowtimeAttribute(_) if rt.isDefined =>
               throw new TableException(
                 "Duplicate window rowtime property encountered. This is a 
bug.")
-            case RowtimeAttribute(_) =>
+            case PlannerRowtimeAttribute(_) =>
               (s, e, Some(i), i - 1)
-            case ProctimeAttribute(_) =>
+            case PlannerProctimeAttribute(_) =>
               // ignore this property, it will be null at the position later
               (s, e, rt, i - 1)
           }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala
index dc34bb6..49d67a8 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.plan.util
 
 import org.apache.flink.table.JDouble
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.dataformat.BinaryRow
 import org.apache.flink.table.plan.nodes.calcite.{Expand, Rank, 
WindowAggregate}
@@ -196,7 +196,7 @@ object FlinkRelMdUtil {
   def makeNamePropertiesSelectivityRexNode(
       winAgg: SingleRel,
       fullGrouping: Array[Int],
-      namedProperties: Seq[NamedWindowProperty],
+      namedProperties: Seq[PlannerNamedWindowProperty],
       predicate: RexNode): RexNode = {
     if (predicate == null || predicate.isAlwaysTrue || 
namedProperties.isEmpty) {
       return predicate
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RelExplainUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RelExplainUtil.scala
index f0ec825..fb7f5e6 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RelExplainUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RelExplainUtil.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.plan.util
 
 import org.apache.flink.table.CalcitePair
 import org.apache.flink.table.api.TableException
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import 
org.apache.flink.table.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.functions.{AggregateFunction, 
UserDefinedFunction}
@@ -781,7 +781,7 @@ object RelExplainUtil {
       grouping: Array[Int],
       rowType: RelDataType,
       aggs: Seq[AggregateCall],
-      namedProperties: Seq[NamedWindowProperty],
+      namedProperties: Seq[PlannerNamedWindowProperty],
       withOutputFieldNames: Boolean = true): String = {
     val inFields = inputType.getFieldNames
     val outFields = rowType.getFieldNames
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
index 46fda8f..59a8e03 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala
@@ -18,21 +18,12 @@
 
 package org.apache.flink.table.sources
 
-import java.sql.Timestamp
-
-import com.google.common.collect.ImmutableList
-import org.apache.calcite.plan.RelOptCluster
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.logical.LogicalValues
-import org.apache.calcite.rex.{RexLiteral, RexNode}
-import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.table.api.{DataTypes, ValidationException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.{unresolvedCall, 
typeLiteral}
-import org.apache.flink.table.expressions.{PlannerResolvedFieldReference, 
ResolvedFieldReference, RexNodeConverter}
+import 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, 
unresolvedCall}
+import org.apache.flink.table.expressions.{ExestingFieldFieldReference, 
ResolvedFieldReference, RexNodeConverter}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 import org.apache.flink.table.types.LogicalTypeDataTypeConverter
 import 
org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
@@ -40,6 +31,16 @@ import org.apache.flink.table.types.logical.{LogicalType, 
TimestampKind, Timesta
 import 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.RelOptCluster
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalValues
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.calcite.tools.RelBuilder
+
+import java.sql.Timestamp
+
 import scala.collection.JavaConversions._
 
 /** Util class for [[TableSource]]. */
@@ -272,7 +273,7 @@ object TableSourceUtil {
           // push an empty values node with the physical schema on the 
relbuilder
           relBuilder.push(createSchemaRelNode(resolvedFields))
           // get extraction expression
-          resolvedFields.map(f => PlannerResolvedFieldReference(f._1, f._3, 
f._2))
+          resolvedFields.map(f => ExestingFieldFieldReference(f._1, f._3, 
f._2))
         } else {
           new Array[ResolvedFieldReference](0)
         }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
index 540f79f..3c20e85 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
@@ -59,8 +59,8 @@ final class ExistingField(val field: String) extends 
TimestampExtractor {
     * into a rowtime attribute.
     */
   override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): 
Expression = {
-    val fieldAccess: PlannerResolvedFieldReference = fieldAccesses(0)
-      .asInstanceOf[PlannerResolvedFieldReference]
+    val fieldAccess: ExestingFieldFieldReference = fieldAccesses(0)
+      .asInstanceOf[ExestingFieldFieldReference]
 
     val fieldReferenceExpr = new FieldReferenceExpression(
       fieldAccess.name,
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 94144b9..2184aae 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.plan.metadata
 
 import org.apache.flink.table.api.{TableConfig, TableException}
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import 
org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty
 import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
 import org.apache.flink.table.catalog.FunctionCatalog
 import 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.intervalOfMillis
@@ -959,8 +959,8 @@ class FlinkRelMdHandlerTestBase {
   // For window start/end/proc_time the windowAttribute inferred type is a 
hard code val,
   // only for row_time we distinguish by batch row time, for what we hard code 
DataTypes.TIMESTAMP,
   // which is ok here for testing.
-  private lazy val windowRef: WindowReference =
-  WindowReference.apply("w$", Some(new TimestampType(3)))
+  private lazy val windowRef: PlannerWindowReference =
+  PlannerWindowReference.apply("w$", Some(new TimestampType(3)))
 
   protected lazy val tumblingGroupWindow: LogicalWindow =
     TumblingGroupWindow(
@@ -973,11 +973,11 @@ class FlinkRelMdHandlerTestBase {
       intervalOfMillis(900000)
     )
 
-  protected lazy val namedPropertiesOfWindowAgg: Seq[NamedWindowProperty] =
-    Seq(NamedWindowProperty("w$start", WindowStart(windowRef)),
-      NamedWindowProperty("w$end", WindowStart(windowRef)),
-      NamedWindowProperty("w$rowtime", RowtimeAttribute(windowRef)),
-      NamedWindowProperty("w$proctime", ProctimeAttribute(windowRef)))
+  protected lazy val namedPropertiesOfWindowAgg: 
Seq[PlannerNamedWindowProperty] =
+    Seq(PlannerNamedWindowProperty("w$start", PlannerWindowStart(windowRef)),
+      PlannerNamedWindowProperty("w$end", PlannerWindowStart(windowRef)),
+      PlannerNamedWindowProperty("w$rowtime", 
PlannerRowtimeAttribute(windowRef)),
+      PlannerNamedWindowProperty("w$proctime", 
PlannerProctimeAttribute(windowRef)))
 
   // equivalent SQL is
   // select a, b, count(c) as s,

Reply via email to