This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d1b22b56a92eef26998555dc35371447593afc8c Author: JingsongLi <lzljs3620...@aliyun.com> AuthorDate: Tue Jul 2 09:56:38 2019 +0800 [FLINK-13049][table-planner-blink] Rename windowProperties and PlannerResolvedFieldReference to avoid name conflit --- .../flink/table/calcite/FlinkRelBuilder.scala | 4 ++-- .../codegen/agg/AggsHandlerCodeGenerator.scala | 14 ++++++------ .../agg/batch/HashWindowCodeGenerator.scala | 4 ++-- .../agg/batch/SortWindowCodeGenerator.scala | 4 ++-- .../codegen/agg/batch/WindowCodeGenerator.scala | 4 ++-- ...nce.scala => ExestingFieldFieldReference.scala} | 2 +- ...perties.scala => plannerWindowProperties.scala} | 24 ++++++++++++--------- .../flink/table/plan/logical/groupWindows.scala | 8 +++---- .../plan/metadata/FlinkRelMdColumnUniqueness.scala | 4 ++-- .../plan/metadata/FlinkRelMdUniqueGroups.scala | 4 ++-- .../table/plan/metadata/FlinkRelMdUniqueKeys.scala | 4 ++-- .../nodes/calcite/LogicalWindowAggregate.scala | 8 +++---- .../table/plan/nodes/calcite/WindowAggregate.scala | 6 +++--- .../logical/FlinkLogicalWindowAggregate.scala | 4 ++-- .../batch/BatchExecHashWindowAggregate.scala | 4 ++-- .../batch/BatchExecHashWindowAggregateBase.scala | 4 ++-- .../batch/BatchExecLocalHashWindowAggregate.scala | 4 ++-- .../batch/BatchExecLocalSortWindowAggregate.scala | 4 ++-- .../batch/BatchExecSortWindowAggregate.scala | 4 ++-- .../batch/BatchExecSortWindowAggregateBase.scala | 4 ++-- .../batch/BatchExecWindowAggregateBase.scala | 6 +++--- .../stream/StreamExecGroupWindowAggregate.scala | 6 +++--- .../logical/LogicalWindowAggregateRuleBase.scala | 8 +++---- .../plan/rules/logical/WindowPropertiesRule.scala | 20 ++++++++++------- .../flink/table/plan/util/AggregateUtil.scala | 20 ++++++++--------- .../flink/table/plan/util/FlinkRelMdUtil.scala | 4 ++-- .../flink/table/plan/util/RelExplainUtil.scala | 4 ++-- .../flink/table/sources/TableSourceUtil.scala | 25 +++++++++++----------- .../table/sources/tsextractors/ExistingField.scala | 4 ++-- .../plan/metadata/FlinkRelMdHandlerTestBase.scala | 16 +++++++------- 30 files changed, 120 insertions(+), 111 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala index 4683424..8b5cf8a 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.calcite import org.apache.flink.table.calcite.FlinkRelFactories.{ExpandFactory, RankFactory, SinkFactory} -import org.apache.flink.table.expressions.WindowProperty +import org.apache.flink.table.expressions.PlannerWindowProperty import org.apache.flink.table.operations.QueryOperation import org.apache.flink.table.plan.QueryOperationConverter import org.apache.flink.table.runtime.rank.{RankRange, RankType} @@ -111,7 +111,7 @@ object FlinkRelBuilder { * * Similar to [[RelBuilder.AggCall]] or [[RelBuilder.GroupKey]]. */ - case class NamedWindowProperty(name: String, property: WindowProperty) + case class PlannerNamedWindowProperty(name: String, property: PlannerWindowProperty) def proto(context: Context): RelBuilderFactory = new RelBuilderFactory() { def create(cluster: RelOptCluster, schema: RelOptSchema): RelBuilder = diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala index f77810b..bf40b04 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/AggsHandlerCodeGenerator.scala @@ -58,7 +58,7 @@ class AggsHandlerCodeGenerator( /** window properties like window_start and window_end, only used in window aggregates */ private var namespaceClassName: String = _ - private var windowProperties: Seq[WindowProperty] = Seq() + private var windowProperties: Seq[PlannerWindowProperty] = Seq() private var hasNamespace: Boolean = false /** Aggregates informations */ @@ -182,7 +182,7 @@ class AggsHandlerCodeGenerator( * Adds window properties such as window_start, window_end */ private def initialWindowProperties( - windowProperties: Seq[WindowProperty], + windowProperties: Seq[PlannerWindowProperty], windowClass: Class[_]): Unit = { this.windowProperties = windowProperties this.namespaceClassName = windowClass.getCanonicalName @@ -404,7 +404,7 @@ class AggsHandlerCodeGenerator( def generateNamespaceAggsHandler[N]( name: String, aggInfoList: AggregateInfoList, - windowProperties: Seq[WindowProperty], + windowProperties: Seq[PlannerWindowProperty], windowClass: Class[N]): GeneratedNamespaceAggsHandleFunction[N] = { initialWindowProperties(windowProperties, windowClass) @@ -663,19 +663,19 @@ class AggsHandlerCodeGenerator( if (hasNamespace) { // append window property results val windowExprs = windowProperties.map { - case w: WindowStart => + case w: PlannerWindowStart => // return a Timestamp(Internal is long) GeneratedExpression( s"$NAMESPACE_TERM.getStart()", "false", "", w.resultType) - case w: WindowEnd => + case w: PlannerWindowEnd => // return a Timestamp(Internal is long) GeneratedExpression( s"$NAMESPACE_TERM.getEnd()", "false", "", w.resultType) - case r: RowtimeAttribute => + case r: PlannerRowtimeAttribute => // return a rowtime, use long as internal type GeneratedExpression( s"$NAMESPACE_TERM.getEnd() - 1", "false", "", r.resultType) - case p: ProctimeAttribute => + case p: PlannerProctimeAttribute => // ignore this property, it will be null at the position later GeneratedExpression("-1L", "true", "", p.resultType) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/HashWindowCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/HashWindowCodeGenerator.scala index 1612c77..21f7200 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/HashWindowCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/HashWindowCodeGenerator.scala @@ -24,7 +24,7 @@ import org.apache.flink.runtime.operators.sort.QuickSort import org.apache.flink.streaming.api.operators.OneInputStreamOperator import org.apache.flink.table.api.Types import org.apache.flink.table.api.window.TimeWindow -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.codegen.CodeGenUtils.{BINARY_ROW, newName} import org.apache.flink.table.codegen.OperatorCodeGenerator.generateCollect import org.apache.flink.table.codegen.agg.batch.AggCodeGenHelper.genGroupKeyChangedCheckCode @@ -68,7 +68,7 @@ class HashWindowCodeGenerator( window: LogicalWindow, inputTimeFieldIndex: Int, inputTimeIsDate: Boolean, - namedProperties: Seq[NamedWindowProperty], + namedProperties: Seq[PlannerNamedWindowProperty], aggInfoList: AggregateInfoList, inputRowType: RelDataType, grouping: Array[Int], diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/SortWindowCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/SortWindowCodeGenerator.scala index c99e850..c97a2ef 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/SortWindowCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/SortWindowCodeGenerator.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.codegen.agg.batch import org.apache.flink.streaming.api.operators.OneInputStreamOperator import org.apache.flink.table.api.window.TimeWindow -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.codegen.CodeGenUtils.BINARY_ROW import org.apache.flink.table.codegen.agg.batch.AggCodeGenHelper.genGroupKeyChangedCheckCode import org.apache.flink.table.codegen.{CodeGenUtils, CodeGeneratorContext, ProjectionCodeGenerator} @@ -60,7 +60,7 @@ class SortWindowCodeGenerator( window: LogicalWindow, inputTimeFieldIndex: Int, inputTimeIsDate: Boolean, - namedProperties: Seq[NamedWindowProperty], + namedProperties: Seq[PlannerNamedWindowProperty], aggInfoList: AggregateInfoList, inputRowType: RelDataType, inputType: RowType, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/WindowCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/WindowCodeGenerator.scala index df2549f..060036f 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/WindowCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/WindowCodeGenerator.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.codegen.agg.batch import org.apache.flink.table.JLong import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.window.TimeWindow -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGenUtils.{BINARY_ROW, boxedTypeTermForType, newName} import org.apache.flink.table.codegen.GenerateUtils.generateFieldAccess @@ -59,7 +59,7 @@ abstract class WindowCodeGenerator( window: LogicalWindow, inputTimeFieldIndex: Int, inputTimeIsDate: Boolean, - namedProperties: Seq[NamedWindowProperty], + namedProperties: Seq[PlannerNamedWindowProperty], aggInfoList: AggregateInfoList, inputRowType: RelDataType, grouping: Array[Int], diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerResolvedFieldReference.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExestingFieldFieldReference.scala similarity index 96% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerResolvedFieldReference.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExestingFieldFieldReference.scala index a3406f8..0ad1f5e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerResolvedFieldReference.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/ExestingFieldFieldReference.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.expressions import org.apache.flink.api.common.typeinfo.TypeInformation -case class PlannerResolvedFieldReference( +case class ExestingFieldFieldReference( name: String, resultType: TypeInformation[_], fieldIndex: Int) extends ResolvedFieldReference diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/plannerWindowProperties.scala similarity index 73% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/plannerWindowProperties.scala index 0adf024..c851a6a 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/plannerWindowProperties.scala @@ -21,45 +21,49 @@ package org.apache.flink.table.expressions import org.apache.flink.table.api.TableException import org.apache.flink.table.types.logical.{BigIntType, LogicalType, TimestampKind, TimestampType} -trait WindowProperty { +trait PlannerWindowProperty { def resultType: LogicalType } -abstract class AbstractWindowProperty(reference: WindowReference) extends WindowProperty { +abstract class AbstractPlannerWindowProperty( + reference: PlannerWindowReference) extends PlannerWindowProperty { override def toString = s"WindowProperty($reference)" } /** * Indicate timeField type. */ -case class WindowReference(name: String, tpe: Option[LogicalType] = None) { +case class PlannerWindowReference(name: String, tpe: Option[LogicalType] = None) { override def toString: String = s"'$name" } -case class WindowStart(reference: WindowReference) extends AbstractWindowProperty(reference) { +case class PlannerWindowStart( + reference: PlannerWindowReference) extends AbstractPlannerWindowProperty(reference) { override def resultType: TimestampType = new TimestampType(3) override def toString: String = s"start($reference)" } -case class WindowEnd(reference: WindowReference) extends AbstractWindowProperty(reference) { +case class PlannerWindowEnd( + reference: PlannerWindowReference) extends AbstractPlannerWindowProperty(reference) { override def resultType: TimestampType = new TimestampType(3) override def toString: String = s"end($reference)" } -case class RowtimeAttribute(reference: WindowReference) extends AbstractWindowProperty(reference) { +case class PlannerRowtimeAttribute( + reference: PlannerWindowReference) extends AbstractPlannerWindowProperty(reference) { override def resultType: LogicalType = { reference match { - case WindowReference(_, Some(tpe)) + case PlannerWindowReference(_, Some(tpe)) if tpe.isInstanceOf[TimestampType] && tpe.asInstanceOf[TimestampType].getKind == TimestampKind.ROWTIME => // rowtime window new TimestampType(true, TimestampKind.ROWTIME, 3) - case WindowReference(_, Some(tpe)) + case PlannerWindowReference(_, Some(tpe)) if tpe.isInstanceOf[BigIntType] || tpe.isInstanceOf[TimestampType] => // batch time window new TimestampType(3) @@ -72,8 +76,8 @@ case class RowtimeAttribute(reference: WindowReference) extends AbstractWindowPr override def toString: String = s"rowtime($reference)" } -case class ProctimeAttribute(reference: WindowReference) - extends AbstractWindowProperty(reference) { +case class PlannerProctimeAttribute(reference: PlannerWindowReference) + extends AbstractPlannerWindowProperty(reference) { override def resultType: LogicalType = new TimestampType(true, TimestampKind.PROCTIME, 3) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala index d9638f2..f9f01c4 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala @@ -27,7 +27,7 @@ import org.apache.flink.table.expressions._ * @param timeAttribute time field indicating event-time or processing-time */ abstract class LogicalWindow( - val aliasAttribute: WindowReference, + val aliasAttribute: PlannerWindowReference, val timeAttribute: FieldReferenceExpression) { override def toString: String = getClass.getSimpleName @@ -38,7 +38,7 @@ abstract class LogicalWindow( // ------------------------------------------------------------------------------------------------ case class TumblingGroupWindow( - alias: WindowReference, + alias: PlannerWindowReference, timeField: FieldReferenceExpression, size: ValueLiteralExpression) extends LogicalWindow( @@ -51,7 +51,7 @@ case class TumblingGroupWindow( // ------------------------------------------------------------------------------------------------ case class SlidingGroupWindow( - alias: WindowReference, + alias: PlannerWindowReference, timeField: FieldReferenceExpression, size: ValueLiteralExpression, slide: ValueLiteralExpression) @@ -67,7 +67,7 @@ case class SlidingGroupWindow( // ------------------------------------------------------------------------------------------------ case class SessionGroupWindow( - alias: WindowReference, + alias: PlannerWindowReference, timeField: FieldReferenceExpression, gap: ValueLiteralExpression) extends LogicalWindow( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala index 0afaa62..7993d67 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.plan.metadata import org.apache.flink.table.JBoolean import org.apache.flink.table.api.TableException -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.plan.nodes.FlinkRelNode import org.apache.flink.table.plan.nodes.calcite.{Expand, Rank, WindowAggregate} import org.apache.flink.table.plan.nodes.common.CommonLookupJoin @@ -402,7 +402,7 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata private def areColumnsUniqueOnWindowAggregate( grouping: Array[Int], - namedProperties: Seq[NamedWindowProperty], + namedProperties: Seq[PlannerNamedWindowProperty], outputFieldCount: Int, mq: RelMetadataQuery, columns: ImmutableBitSet, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala index 62ab472..f743dd5 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.plan.metadata -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.plan.metadata.FlinkMetadata.UniqueGroups import org.apache.flink.table.plan.nodes.calcite.{Expand, Rank, WindowAggregate} import org.apache.flink.table.plan.nodes.physical.batch._ @@ -276,7 +276,7 @@ class FlinkRelMdUniqueGroups private extends MetadataHandler[UniqueGroups] { windowAgg: SingleRel, grouping: Array[Int], auxGrouping: Array[Int], - namedProperties: Seq[NamedWindowProperty], + namedProperties: Seq[PlannerNamedWindowProperty], mq: RelMetadataQuery, columns: ImmutableBitSet): ImmutableBitSet = { val fieldCount = windowAgg.getRowType.getFieldCount diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala index 84d66cd..89ba591 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.plan.metadata -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.plan.nodes.calcite.{Expand, Rank, WindowAggregate} import org.apache.flink.table.plan.nodes.common.CommonLookupJoin import org.apache.flink.table.plan.nodes.logical._ @@ -342,7 +342,7 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu private def getUniqueKeysOnWindowAgg( fieldCount: Int, - namedProperties: Seq[NamedWindowProperty], + namedProperties: Seq[PlannerNamedWindowProperty], grouping: Array[Int], mq: RelMetadataQuery, ignoreNulls: Boolean): util.Set[ImmutableBitSet] = { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWindowAggregate.scala index 32803f6..be8288e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWindowAggregate.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.plan.nodes.calcite -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.plan.logical.LogicalWindow import org.apache.calcite.plan.{Convention, RelOptCluster, RelTraitSet} @@ -36,7 +36,7 @@ final class LogicalWindowAggregate( groupSet: ImmutableBitSet, aggCalls: util.List[AggregateCall], window: LogicalWindow, - namedProperties: Seq[NamedWindowProperty]) + namedProperties: Seq[PlannerNamedWindowProperty]) extends WindowAggregate(cluster, traitSet, child, groupSet, aggCalls, window, namedProperties) { override def copy( @@ -56,7 +56,7 @@ final class LogicalWindowAggregate( namedProperties) } - def copy(namedProperties: Seq[NamedWindowProperty]): LogicalWindowAggregate = { + def copy(namedProperties: Seq[PlannerNamedWindowProperty]): LogicalWindowAggregate = { new LogicalWindowAggregate( cluster, traitSet, @@ -72,7 +72,7 @@ object LogicalWindowAggregate { def create( window: LogicalWindow, - namedProperties: Seq[NamedWindowProperty], + namedProperties: Seq[PlannerNamedWindowProperty], agg: Aggregate): LogicalWindowAggregate = { require(!agg.indicator && (agg.getGroupType == Group.SIMPLE)) val cluster: RelOptCluster = agg.getCluster diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WindowAggregate.scala index efe2b8a..e206b65 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WindowAggregate.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.plan.nodes.calcite -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.logical.LogicalWindow @@ -43,7 +43,7 @@ abstract class WindowAggregate( groupSet: ImmutableBitSet, aggCalls: util.List[AggregateCall], window: LogicalWindow, - namedProperties: Seq[NamedWindowProperty]) + namedProperties: Seq[PlannerNamedWindowProperty]) extends Aggregate( cluster, traitSet, @@ -55,7 +55,7 @@ abstract class WindowAggregate( def getWindow: LogicalWindow = window - def getNamedProperties: Seq[NamedWindowProperty] = namedProperties + def getNamedProperties: Seq[PlannerNamedWindowProperty] = namedProperties override def accept(shuttle: RelShuttle): RelNode = shuttle.visit(this) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala index 10068dc..ebea49c 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.plan.nodes.logical -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.plan.logical.LogicalWindow import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.calcite.{LogicalWindowAggregate, WindowAggregate} @@ -43,7 +43,7 @@ class FlinkLogicalWindowAggregate( groupSet: ImmutableBitSet, aggCalls: util.List[AggregateCall], window: LogicalWindow, - namedProperties: Seq[NamedWindowProperty]) + namedProperties: Seq[PlannerNamedWindowProperty]) extends WindowAggregate(cluster, traitSet, child, groupSet, aggCalls, window, namedProperties) with FlinkLogicalRel { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala index c7eb08b..e42fb81 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.plan.nodes.physical.batch import org.apache.flink.runtime.operators.DamBehavior -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.functions.UserDefinedFunction import org.apache.flink.table.plan.logical.LogicalWindow @@ -45,7 +45,7 @@ class BatchExecHashWindowAggregate( window: LogicalWindow, inputTimeFieldIndex: Int, inputTimeIsDate: Boolean, - namedProperties: Seq[NamedWindowProperty], + namedProperties: Seq[PlannerNamedWindowProperty], enableAssignPane: Boolean = false, isMerge: Boolean) extends BatchExecHashWindowAggregateBase( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala index 4ca99f6..59e2d3f 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.plan.nodes.physical.batch import org.apache.flink.streaming.api.transformations.OneInputTransformation import org.apache.flink.table.api.{BatchTableEnvironment, TableConfigOptions} -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGeneratorContext import org.apache.flink.table.codegen.agg.batch.{HashWindowCodeGenerator, WindowCodeGenerator} @@ -61,7 +61,7 @@ abstract class BatchExecHashWindowAggregateBase( window: LogicalWindow, inputTimeFieldIndex: Int, inputTimeIsDate: Boolean, - namedProperties: Seq[NamedWindowProperty], + namedProperties: Seq[PlannerNamedWindowProperty], enableAssignPane: Boolean = false, isMerge: Boolean, isFinal: Boolean) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala index 34752c8..eea8eb2 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.plan.nodes.physical.batch import org.apache.flink.runtime.operators.DamBehavior -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.functions.UserDefinedFunction import org.apache.flink.table.plan.logical.LogicalWindow @@ -44,7 +44,7 @@ class BatchExecLocalHashWindowAggregate( window: LogicalWindow, val inputTimeFieldIndex: Int, inputTimeIsDate: Boolean, - namedProperties: Seq[NamedWindowProperty], + namedProperties: Seq[PlannerNamedWindowProperty], enableAssignPane: Boolean = false) extends BatchExecHashWindowAggregateBase( cluster, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala index d8b68fc..0ebc072 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.plan.nodes.physical.batch import org.apache.flink.runtime.operators.DamBehavior -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.functions.UserDefinedFunction import org.apache.flink.table.plan.logical.LogicalWindow @@ -44,7 +44,7 @@ class BatchExecLocalSortWindowAggregate( window: LogicalWindow, val inputTimeFieldIndex: Int, inputTimeIsDate: Boolean, - namedProperties: Seq[NamedWindowProperty], + namedProperties: Seq[PlannerNamedWindowProperty], enableAssignPane: Boolean = false) extends BatchExecSortWindowAggregateBase( cluster, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala index 959aa9d..297488e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.plan.nodes.physical.batch import org.apache.flink.runtime.operators.DamBehavior import org.apache.flink.table.api.TableConfig -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.dataformat.BaseRow import org.apache.flink.table.functions.UserDefinedFunction import org.apache.flink.table.plan.logical.LogicalWindow @@ -45,7 +45,7 @@ class BatchExecSortWindowAggregate( window: LogicalWindow, inputTimeFieldIndex: Int, inputTimeIsDate: Boolean, - namedProperties: Seq[NamedWindowProperty], + namedProperties: Seq[PlannerNamedWindowProperty], enableAssignPane: Boolean = false, isMerge: Boolean) extends BatchExecSortWindowAggregateBase( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala index 36dabfd..fc282bb 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.plan.nodes.physical.batch import org.apache.flink.streaming.api.transformations.OneInputTransformation import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig, TableConfigOptions} -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGeneratorContext import org.apache.flink.table.codegen.agg.batch.{SortWindowCodeGenerator, WindowCodeGenerator} @@ -58,7 +58,7 @@ abstract class BatchExecSortWindowAggregateBase( window: LogicalWindow, inputTimeFieldIndex: Int, inputTimeIsDate: Boolean, - namedProperties: Seq[NamedWindowProperty], + namedProperties: Seq[PlannerNamedWindowProperty], enableAssignPane: Boolean = false, isMerge: Boolean, isFinal: Boolean) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecWindowAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecWindowAggregateBase.scala index 49ec65e..4982d37 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecWindowAggregateBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecWindowAggregateBase.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.plan.nodes.physical.batch import org.apache.flink.table.api.TableException -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.functions.UserDefinedFunction import org.apache.flink.table.plan.logical.LogicalWindow import org.apache.flink.table.plan.util.RelExplainUtil @@ -39,7 +39,7 @@ abstract class BatchExecWindowAggregateBase( auxGrouping: Array[Int], aggCallToAggFunction: Seq[(AggregateCall, UserDefinedFunction)], window: LogicalWindow, - namedProperties: Seq[NamedWindowProperty], + namedProperties: Seq[PlannerNamedWindowProperty], enableAssignPane: Boolean = true, val isMerge: Boolean, val isFinal: Boolean) @@ -56,7 +56,7 @@ abstract class BatchExecWindowAggregateBase( def getWindow: LogicalWindow = window - def getNamedProperties: Seq[NamedWindowProperty] = namedProperties + def getNamedProperties: Seq[PlannerNamedWindowProperty] = namedProperties def getAggCallList: Seq[AggregateCall] = aggCallToAggFunction.map(_._1) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala index 5812780..680bed3 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.plan.nodes.physical.stream import org.apache.flink.streaming.api.transformations.OneInputTransformation import org.apache.flink.table.api.window.{CountWindow, TimeWindow} import org.apache.flink.table.api.{StreamTableEnvironment, TableConfig, TableException} -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.agg.AggsHandlerCodeGenerator import org.apache.flink.table.codegen.{CodeGeneratorContext, EqualiserCodeGenerator} @@ -61,7 +61,7 @@ class StreamExecGroupWindowAggregate( grouping: Array[Int], val aggCalls: Seq[AggregateCall], val window: LogicalWindow, - namedProperties: Seq[NamedWindowProperty], + namedProperties: Seq[PlannerNamedWindowProperty], inputTimeFieldIndex: Int, val emitStrategy: WindowEmitStrategy) extends SingleRel(cluster, traitSet, inputRel) @@ -88,7 +88,7 @@ class StreamExecGroupWindowAggregate( def getGrouping: Array[Int] = grouping - def getWindowProperties: Seq[NamedWindowProperty] = namedProperties + def getWindowProperties: Seq[PlannerNamedWindowProperty] = namedProperties override def deriveRowType(): RelDataType = outputRowType diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalWindowAggregateRuleBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalWindowAggregateRuleBase.scala index 557f3c2..905b3f3 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalWindowAggregateRuleBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalWindowAggregateRuleBase.scala @@ -18,9 +18,9 @@ package org.apache.flink.table.plan.rules.logical import org.apache.flink.table.api._ -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.expressions.utils.ApiExpressionUtils.intervalOfMillis -import org.apache.flink.table.expressions.{FieldReferenceExpression, WindowReference} +import org.apache.flink.table.expressions.{FieldReferenceExpression, PlannerWindowReference} import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable import org.apache.flink.table.plan.logical.{LogicalWindow, SessionGroupWindow, SlidingGroupWindow, TumblingGroupWindow} import org.apache.flink.table.plan.nodes.calcite.LogicalWindowAggregate @@ -105,7 +105,7 @@ abstract class LogicalWindowAggregateRuleBase(description: String) val transformed = call.builder() val windowAgg = LogicalWindowAggregate.create( window, - Seq[NamedWindowProperty](), + Seq[PlannerNamedWindowProperty](), newAgg) // The transformation adds an additional LogicalProject at the top to ensure // that the types are equivalent. @@ -176,7 +176,7 @@ abstract class LogicalWindowAggregateRuleBase(description: String) val timeField = getTimeFieldReference(windowExpr.getOperands.get(0), windowExprIdx, rowType) val resultType = Some(fromDataTypeToLogicalType(timeField.getOutputDataType)) - val windowRef = WindowReference("w$", resultType) + val windowRef = PlannerWindowReference("w$", resultType) windowExpr.getOperator match { case FlinkSqlOperatorTable.TUMBLE => val interval = getOperandAsLong(windowExpr, 1) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/WindowPropertiesRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/WindowPropertiesRule.scala index af6878f..c242f3e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/WindowPropertiesRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/WindowPropertiesRule.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.plan.rules.logical import org.apache.flink.table.api.{TableException, Types, ValidationException} -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable import org.apache.flink.table.plan.logical.LogicalWindow @@ -104,19 +104,23 @@ object WindowPropertiesRules { val windowType = getWindowType(w) val startEndProperties = Seq( - NamedWindowProperty(propertyName(w, "start"), WindowStart(w.aliasAttribute)), - NamedWindowProperty(propertyName(w, "end"), WindowEnd(w.aliasAttribute))) + PlannerNamedWindowProperty(propertyName(w, "start"), PlannerWindowStart(w.aliasAttribute)), + PlannerNamedWindowProperty(propertyName(w, "end"), PlannerWindowEnd(w.aliasAttribute))) // allow rowtime/proctime for rowtime windows and proctime for proctime windows val timeProperties = windowType match { case 'streamRowtime => Seq( - NamedWindowProperty(propertyName(w, "rowtime"), RowtimeAttribute(w.aliasAttribute)), - NamedWindowProperty(propertyName(w, "proctime"), ProctimeAttribute(w.aliasAttribute))) + PlannerNamedWindowProperty(propertyName(w, "rowtime"), + PlannerRowtimeAttribute(w.aliasAttribute)), + PlannerNamedWindowProperty(propertyName(w, "proctime"), + PlannerProctimeAttribute(w.aliasAttribute))) case 'streamProctime => - Seq(NamedWindowProperty(propertyName(w, "proctime"), ProctimeAttribute(w.aliasAttribute))) + Seq(PlannerNamedWindowProperty(propertyName(w, "proctime"), + PlannerProctimeAttribute(w.aliasAttribute))) case 'batchRowtime => - Seq(NamedWindowProperty(propertyName(w, "rowtime"), RowtimeAttribute(w.aliasAttribute))) + Seq(PlannerNamedWindowProperty(propertyName(w, "rowtime"), + PlannerRowtimeAttribute(w.aliasAttribute))) case _ => throw new TableException("Unknown window type encountered. Please report this bug.") } @@ -161,7 +165,7 @@ object WindowPropertiesRules { /** Generates a property name for a window. */ private def propertyName(window: LogicalWindow, name: String): String = - window.aliasAttribute.asInstanceOf[WindowReference].name + name + window.aliasAttribute.asInstanceOf[PlannerWindowReference].name + name /** Replace group auxiliaries with field references. */ def replaceGroupAuxiliaries( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala index c325eba..20ebf10 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.plan.util import org.apache.flink.api.common.typeinfo.Types import org.apache.flink.table.JLong import org.apache.flink.table.api.{DataTypes, TableConfig, TableConfigOptions, TableException} -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.calcite.{FlinkTypeFactory, FlinkTypeSystem} import org.apache.flink.table.dataformat.BaseRow import org.apache.flink.table.dataview.DataViewUtils.useNullSerializerForStateViewFieldsFromAccType @@ -688,27 +688,27 @@ object AggregateUtil extends Enumeration { * Computes the positions of (window start, window end, row time). */ private[flink] def computeWindowPropertyPos( - properties: Seq[NamedWindowProperty]): (Option[Int], Option[Int], Option[Int]) = { + properties: Seq[PlannerNamedWindowProperty]): (Option[Int], Option[Int], Option[Int]) = { val propPos = properties.foldRight( (None: Option[Int], None: Option[Int], None: Option[Int], 0)) { case (p, (s, e, rt, i)) => p match { - case NamedWindowProperty(_, prop) => + case PlannerNamedWindowProperty(_, prop) => prop match { - case WindowStart(_) if s.isDefined => + case PlannerWindowStart(_) if s.isDefined => throw new TableException( "Duplicate window start property encountered. This is a bug.") - case WindowStart(_) => + case PlannerWindowStart(_) => (Some(i), e, rt, i - 1) - case WindowEnd(_) if e.isDefined => + case PlannerWindowEnd(_) if e.isDefined => throw new TableException("Duplicate window end property encountered. This is a bug.") - case WindowEnd(_) => + case PlannerWindowEnd(_) => (s, Some(i), rt, i - 1) - case RowtimeAttribute(_) if rt.isDefined => + case PlannerRowtimeAttribute(_) if rt.isDefined => throw new TableException( "Duplicate window rowtime property encountered. This is a bug.") - case RowtimeAttribute(_) => + case PlannerRowtimeAttribute(_) => (s, e, Some(i), i - 1) - case ProctimeAttribute(_) => + case PlannerProctimeAttribute(_) => // ignore this property, it will be null at the position later (s, e, rt, i - 1) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala index dc34bb6..49d67a8 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelMdUtil.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.plan.util import org.apache.flink.table.JDouble -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.dataformat.BinaryRow import org.apache.flink.table.plan.nodes.calcite.{Expand, Rank, WindowAggregate} @@ -196,7 +196,7 @@ object FlinkRelMdUtil { def makeNamePropertiesSelectivityRexNode( winAgg: SingleRel, fullGrouping: Array[Int], - namedProperties: Seq[NamedWindowProperty], + namedProperties: Seq[PlannerNamedWindowProperty], predicate: RexNode): RexNode = { if (predicate == null || predicate.isAlwaysTrue || namedProperties.isEmpty) { return predicate diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RelExplainUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RelExplainUtil.scala index f0ec825..fb7f5e6 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RelExplainUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RelExplainUtil.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.plan.util import org.apache.flink.table.CalcitePair import org.apache.flink.table.api.TableException -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.functions.aggfunctions.DeclarativeAggregateFunction import org.apache.flink.table.functions.utils.TableSqlFunction import org.apache.flink.table.functions.{AggregateFunction, UserDefinedFunction} @@ -781,7 +781,7 @@ object RelExplainUtil { grouping: Array[Int], rowType: RelDataType, aggs: Seq[AggregateCall], - namedProperties: Seq[NamedWindowProperty], + namedProperties: Seq[PlannerNamedWindowProperty], withOutputFieldNames: Boolean = true): String = { val inFields = inputType.getFieldNames val outFields = rowType.getFieldNames diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala index 46fda8f..59a8e03 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala @@ -18,21 +18,12 @@ package org.apache.flink.table.sources -import java.sql.Timestamp - -import com.google.common.collect.ImmutableList -import org.apache.calcite.plan.RelOptCluster -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.logical.LogicalValues -import org.apache.calcite.rex.{RexLiteral, RexNode} -import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.table.api.{DataTypes, ValidationException} import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{unresolvedCall, typeLiteral} -import org.apache.flink.table.expressions.{PlannerResolvedFieldReference, ResolvedFieldReference, RexNodeConverter} +import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, unresolvedCall} +import org.apache.flink.table.expressions.{ExestingFieldFieldReference, ResolvedFieldReference, RexNodeConverter} import org.apache.flink.table.functions.BuiltInFunctionDefinitions import org.apache.flink.table.types.LogicalTypeDataTypeConverter import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType @@ -40,6 +31,16 @@ import org.apache.flink.table.types.logical.{LogicalType, TimestampKind, Timesta import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo +import com.google.common.collect.ImmutableList +import org.apache.calcite.plan.RelOptCluster +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.logical.LogicalValues +import org.apache.calcite.rex.{RexLiteral, RexNode} +import org.apache.calcite.tools.RelBuilder + +import java.sql.Timestamp + import scala.collection.JavaConversions._ /** Util class for [[TableSource]]. */ @@ -272,7 +273,7 @@ object TableSourceUtil { // push an empty values node with the physical schema on the relbuilder relBuilder.push(createSchemaRelNode(resolvedFields)) // get extraction expression - resolvedFields.map(f => PlannerResolvedFieldReference(f._1, f._3, f._2)) + resolvedFields.map(f => ExestingFieldFieldReference(f._1, f._3, f._2)) } else { new Array[ResolvedFieldReference](0) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala index 540f79f..3c20e85 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala @@ -59,8 +59,8 @@ final class ExistingField(val field: String) extends TimestampExtractor { * into a rowtime attribute. */ override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression = { - val fieldAccess: PlannerResolvedFieldReference = fieldAccesses(0) - .asInstanceOf[PlannerResolvedFieldReference] + val fieldAccess: ExestingFieldFieldReference = fieldAccesses(0) + .asInstanceOf[ExestingFieldFieldReference] val fieldReferenceExpr = new FieldReferenceExpression( fieldAccess.name, diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala index 94144b9..2184aae 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.plan.metadata import org.apache.flink.table.api.{TableConfig, TableException} -import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkRelBuilder.PlannerNamedWindowProperty import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} import org.apache.flink.table.catalog.FunctionCatalog import org.apache.flink.table.expressions.utils.ApiExpressionUtils.intervalOfMillis @@ -959,8 +959,8 @@ class FlinkRelMdHandlerTestBase { // For window start/end/proc_time the windowAttribute inferred type is a hard code val, // only for row_time we distinguish by batch row time, for what we hard code DataTypes.TIMESTAMP, // which is ok here for testing. - private lazy val windowRef: WindowReference = - WindowReference.apply("w$", Some(new TimestampType(3))) + private lazy val windowRef: PlannerWindowReference = + PlannerWindowReference.apply("w$", Some(new TimestampType(3))) protected lazy val tumblingGroupWindow: LogicalWindow = TumblingGroupWindow( @@ -973,11 +973,11 @@ class FlinkRelMdHandlerTestBase { intervalOfMillis(900000) ) - protected lazy val namedPropertiesOfWindowAgg: Seq[NamedWindowProperty] = - Seq(NamedWindowProperty("w$start", WindowStart(windowRef)), - NamedWindowProperty("w$end", WindowStart(windowRef)), - NamedWindowProperty("w$rowtime", RowtimeAttribute(windowRef)), - NamedWindowProperty("w$proctime", ProctimeAttribute(windowRef))) + protected lazy val namedPropertiesOfWindowAgg: Seq[PlannerNamedWindowProperty] = + Seq(PlannerNamedWindowProperty("w$start", PlannerWindowStart(windowRef)), + PlannerNamedWindowProperty("w$end", PlannerWindowStart(windowRef)), + PlannerNamedWindowProperty("w$rowtime", PlannerRowtimeAttribute(windowRef)), + PlannerNamedWindowProperty("w$proctime", PlannerProctimeAttribute(windowRef))) // equivalent SQL is // select a, b, count(c) as s,