Repository: flink Updated Branches: refs/heads/master 971dcc5de -> 1b327f1ae
[FLINK-3916] [table] Allow generic types passing the Table API This closes #2197. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b327f1a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b327f1a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b327f1a Branch: refs/heads/master Commit: 1b327f1ae7d078e22700729524e374b449b0f209 Parents: 971dcc5 Author: twalthr <twal...@apache.org> Authored: Tue May 17 16:14:29 2016 +0200 Committer: twalthr <twal...@apache.org> Committed: Tue Jul 12 15:33:11 2016 +0200 ---------------------------------------------------------------------- .../flink/api/table/BatchTableEnvironment.scala | 2 +- .../flink/api/table/FlinkPlannerImpl.scala | 44 +++---- .../flink/api/table/FlinkRelBuilder.scala | 87 +++++++++++++ .../flink/api/table/FlinkTypeFactory.scala | 124 +++++++++++++++++++ .../api/table/StreamTableEnvironment.scala | 2 +- .../flink/api/table/TableEnvironment.scala | 40 +++--- .../flink/api/table/codegen/CodeGenerator.scala | 7 +- .../flink/api/table/expressions/cast.scala | 11 +- .../flink/api/table/expressions/literals.scala | 15 ++- .../api/table/plan/logical/operators.scala | 6 +- .../plan/nodes/dataset/DataSetAggregate.scala | 5 +- .../api/table/plan/schema/DataStreamTable.scala | 5 +- .../api/table/plan/schema/FlinkTable.scala | 24 ++-- .../table/plan/schema/GenericRelDataType.scala | 53 ++++++++ .../table/runtime/aggregate/AggregateUtil.scala | 7 +- .../org/apache/flink/api/table/table.scala | 15 ++- .../api/table/typeutils/TypeCheckUtils.scala | 18 ++- .../api/table/typeutils/TypeConverter.scala | 63 +--------- .../api/java/batch/table/FromDataSetITCase.java | 58 +++++++++ .../flink/api/java/stream/sql/SqlITCase.java | 6 +- .../expressions/utils/ExpressionTestBase.scala | 11 +- 21 files changed, 439 insertions(+), 164 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala index b1d5534..1ba13be 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala @@ -123,7 +123,7 @@ abstract class BatchTableEnvironment( */ override def sql(query: String): Table = { - val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner) + val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) // parse the sql query val parsed = planner.parse(query) // validate the sql query http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala index 9d0a146..f016d57 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala @@ -21,7 +21,6 @@ package org.apache.flink.api.table import java.util import com.google.common.collect.ImmutableList -import org.apache.calcite.adapter.java.JavaTypeFactory import org.apache.calcite.jdbc.CalciteSchema import org.apache.calcite.plan.RelOptTable.ViewExpander import org.apache.calcite.plan._ @@ -30,47 +29,37 @@ import org.apache.calcite.rel.RelRoot import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex.RexBuilder import org.apache.calcite.schema.SchemaPlus -import org.apache.calcite.sql.parser.{SqlParser, SqlParseException} +import org.apache.calcite.sql.parser.{SqlParseException, SqlParser} import org.apache.calcite.sql.validate.SqlValidator import org.apache.calcite.sql.{SqlNode, SqlOperatorTable} -import org.apache.calcite.sql2rel.{RelDecorrelator, SqlToRelConverter, SqlRexConvertletTable} -import org.apache.calcite.tools.{RelConversionException, ValidationException => CValidationException, Frameworks, FrameworkConfig} -import org.apache.calcite.util.Util +import org.apache.calcite.sql2rel.{RelDecorrelator, SqlRexConvertletTable, SqlToRelConverter} +import org.apache.calcite.tools.{FrameworkConfig, RelConversionException, ValidationException => CValidationException} + import scala.collection.JavaConversions._ -/** NOTE: this is heavily insipred by Calcite's PlannerImpl. - We need it in order to share the planner between the Table API relational plans - and the SQL relation plans that are created by the Calcite parser. - The only difference is that we initialize the RelOptPlanner planner - when instantiating, instead of creating a new one in the ready() method. **/ -class FlinkPlannerImpl(config: FrameworkConfig, var planner: RelOptPlanner) { +/** + * NOTE: this is heavily inspired by Calcite's PlannerImpl. + * We need it in order to share the planner between the Table API relational plans + * and the SQL relation plans that are created by the Calcite parser. + * The main difference is that we do not create a new RelOptPlanner in the ready() method. + */ +class FlinkPlannerImpl( + config: FrameworkConfig, + planner: RelOptPlanner, + typeFactory: FlinkTypeFactory) { val operatorTable: SqlOperatorTable = config.getOperatorTable /** Holds the trait definitions to be registered with planner. May be null. */ val traitDefs: ImmutableList[RelTraitDef[_ <: RelTrait]] = config.getTraitDefs val parserConfig: SqlParser.Config = config.getParserConfig val convertletTable: SqlRexConvertletTable = config.getConvertletTable - var defaultSchema: SchemaPlus = config.getDefaultSchema + val defaultSchema: SchemaPlus = config.getDefaultSchema - var typeFactory: JavaTypeFactory = null var validator: FlinkCalciteSqlValidator = null var validatedSqlNode: SqlNode = null var root: RelRoot = null private def ready() { - Frameworks.withPlanner(new Frameworks.PlannerAction[Unit] { - def apply( - cluster: RelOptCluster, - relOptSchema: RelOptSchema, - rootSchema: SchemaPlus): Unit = { - - Util.discard(rootSchema) - typeFactory = cluster.getTypeFactory.asInstanceOf[JavaTypeFactory] - if (planner == null) { - planner = cluster.getPlanner - } - } - }, config) if (this.traitDefs != null) { planner.clearRelTraitDefs() for (traitDef <- this.traitDefs) { @@ -95,9 +84,8 @@ class FlinkPlannerImpl(config: FrameworkConfig, var planner: RelOptPlanner) { validatedSqlNode = validator.validate(sqlNode) } catch { - case e: RuntimeException => { + case e: RuntimeException => throw new CValidationException(e) - } } validatedSqlNode } http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala new file mode 100644 index 0000000..e3bb97e --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table + +import org.apache.calcite.jdbc.CalciteSchema +import org.apache.calcite.plan.{Context, RelOptCluster, RelOptSchema} +import org.apache.calcite.prepare.CalciteCatalogReader +import org.apache.calcite.rex.RexBuilder +import org.apache.calcite.schema.SchemaPlus +import org.apache.calcite.tools.Frameworks.PlannerAction +import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder} + +/** + * Flink specific [[RelBuilder]] that changes the default type factory to a [[FlinkTypeFactory]]. + */ +class FlinkRelBuilder( + context: Context, + cluster: RelOptCluster, + relOptSchema: RelOptSchema) + extends RelBuilder( + context, + cluster, + relOptSchema) { + + def getPlanner = cluster.getPlanner + + def getCluster = cluster + + override def getTypeFactory: FlinkTypeFactory = + super.getTypeFactory.asInstanceOf[FlinkTypeFactory] +} + +object FlinkRelBuilder { + + def create(config: FrameworkConfig): FlinkRelBuilder = { + // prepare planner and collect context instances + val clusters: Array[RelOptCluster] = Array(null) + val relOptSchemas: Array[RelOptSchema] = Array(null) + val rootSchemas: Array[SchemaPlus] = Array(null) + Frameworks.withPlanner(new PlannerAction[Void] { + override def apply( + cluster: RelOptCluster, + relOptSchema: RelOptSchema, + rootSchema: SchemaPlus) + : Void = { + clusters(0) = cluster + relOptSchemas(0) = relOptSchema + rootSchemas(0) = rootSchema + null + } + }) + val planner = clusters(0).getPlanner + val defaultRelOptSchema = relOptSchemas(0).asInstanceOf[CalciteCatalogReader] + + // create Flink type factory + val typeSystem = config.getTypeSystem + val typeFactory = new FlinkTypeFactory(typeSystem) + + // create context instances with Flink type factory + val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory)) + val calciteSchema = CalciteSchema.from(config.getDefaultSchema) + val relOptSchema = new CalciteCatalogReader( + calciteSchema, + config.getParserConfig.caseSensitive(), + defaultRelOptSchema.getSchemaName, + typeFactory) + + new FlinkRelBuilder(config.getContext, cluster, relOptSchema) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala new file mode 100644 index 0000000..6a31487 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table + +import org.apache.calcite.jdbc.JavaTypeFactoryImpl +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem} +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.sql.`type`.SqlTypeName._ +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ValueTypeInfo._ +import org.apache.flink.api.table.FlinkTypeFactory.typeInfoToSqlTypeName +import org.apache.flink.api.table.plan.schema.GenericRelDataType +import org.apache.flink.api.table.typeutils.TypeCheckUtils.isSimple + +import scala.collection.mutable + +/** + * Flink specific type factory that represents the interface between Flink's [[TypeInformation]] + * and Calcite's [[RelDataType]]. + */ +class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImpl(typeSystem) { + + private val seenTypes = mutable.HashMap[TypeInformation[_], RelDataType]() + + def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType = { + // simple type can be converted to SQL types and vice versa + if (isSimple(typeInfo)) { + createSqlType(typeInfoToSqlTypeName(typeInfo)) + } + // advanced types require specific RelDataType + // for storing the original TypeInformation + else { + seenTypes.getOrElseUpdate(typeInfo, canonize(createAdvancedType(typeInfo))) + } + } + + private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match { + // TODO add specific RelDataTypes + // for PrimitiveArrayTypeInfo, ObjectArrayTypeInfo, CompositeType + case ti: TypeInformation[_] => + new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem]) + + case ti@_ => + throw new TableException(s"Unsupported type information: $ti") + } +} + +object FlinkTypeFactory { + + private def typeInfoToSqlTypeName(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo match { + case BOOLEAN_TYPE_INFO => BOOLEAN + case BYTE_TYPE_INFO => TINYINT + case SHORT_TYPE_INFO => SMALLINT + case INT_TYPE_INFO => INTEGER + case LONG_TYPE_INFO => BIGINT + case FLOAT_TYPE_INFO => FLOAT + case DOUBLE_TYPE_INFO => DOUBLE + case STRING_TYPE_INFO => VARCHAR + case BIG_DEC_TYPE_INFO => DECIMAL + + // date/time types + case SqlTimeTypeInfo.DATE => DATE + case SqlTimeTypeInfo.TIME => TIME + case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP + + case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO => + throw new TableException("Character type is not supported.") + + case _@t => + throw new TableException(s"Type is not supported: $t") + } + + def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match { + case BOOLEAN => BOOLEAN_TYPE_INFO + case TINYINT => BYTE_TYPE_INFO + case SMALLINT => SHORT_TYPE_INFO + case INTEGER => INT_TYPE_INFO + case BIGINT => LONG_TYPE_INFO + case FLOAT => FLOAT_TYPE_INFO + case DOUBLE => DOUBLE_TYPE_INFO + case VARCHAR | CHAR => STRING_TYPE_INFO + case DECIMAL => BIG_DEC_TYPE_INFO + + // date/time types + case DATE => SqlTimeTypeInfo.DATE + case TIME => SqlTimeTypeInfo.TIME + case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP + case INTERVAL_DAY_TIME | INTERVAL_YEAR_MONTH => + throw new TableException("Intervals are not supported yet.") + + case NULL => + throw new TableException("Type NULL is not supported. " + + "Null values must have a supported type.") + + // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING + // are represented as integer + case SYMBOL => INT_TYPE_INFO + + // extract encapsulated TypeInformation + case ANY if relDataType.isInstanceOf[GenericRelDataType] => + val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType] + genericRelDataType.typeInfo + + case _@t => + throw new TableException(s"Type is not supported: $t") + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala index daa74da..4f57ae9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala @@ -124,7 +124,7 @@ abstract class StreamTableEnvironment( */ override def sql(query: String): Table = { - val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner) + val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) // parse the sql query val parsed = planner.parse(query) // validate the sql query http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala index 0f6cb24..73dbbaa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala @@ -21,26 +21,23 @@ package org.apache.flink.api.table import java.util.concurrent.atomic.AtomicInteger import org.apache.calcite.config.Lex -import org.apache.calcite.plan.{RelOptCluster, RelOptPlanner} -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.plan.RelOptPlanner +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.schema.SchemaPlus import org.apache.calcite.schema.impl.AbstractTable import org.apache.calcite.sql.parser.SqlParser -import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder} - +import org.apache.calcite.tools.{FrameworkConfig, Frameworks} import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} -import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv} -import org.apache.flink.api.java.table.{BatchTableEnvironment => JavaBatchTableEnv} -import org.apache.flink.api.java.table.{StreamTableEnvironment => JavaStreamTableEnv} +import org.apache.flink.api.java.table.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv} import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} -import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv} -import org.apache.flink.api.scala.table.{BatchTableEnvironment => ScalaBatchTableEnv} -import org.apache.flink.api.scala.table.{StreamTableEnvironment => ScalaStreamTableEnv} +import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv} +import org.apache.flink.api.scala.table.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv} import org.apache.flink.api.table.expressions.{Alias, Expression, UnresolvedFieldReference} import org.apache.flink.api.table.plan.cost.DataSetCostFactory -import org.apache.flink.api.table.sinks.TableSink import org.apache.flink.api.table.plan.schema.{RelTable, TransStreamTable} +import org.apache.flink.api.table.sinks.TableSink import org.apache.flink.api.table.validate.FunctionCatalog import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamExecEnv} import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv} @@ -73,16 +70,12 @@ abstract class TableEnvironment(val config: TableConfig) { .build // the builder for Calcite RelNodes, Calcite's representation of a relational expression tree. - protected val relBuilder: RelBuilder = RelBuilder.create(frameworkConfig) - - private val cluster: RelOptCluster = relBuilder - .values(Array("dummy"), new Integer(1)) - .build().getCluster + protected val relBuilder: FlinkRelBuilder = FlinkRelBuilder.create(frameworkConfig) // the planner instance used to optimize queries of this TableEnvironment - private val planner: RelOptPlanner = cluster.getPlanner + private val planner: RelOptPlanner = relBuilder.getPlanner - private val typeFactory: RelDataTypeFactory = cluster.getTypeFactory + private val typeFactory: FlinkTypeFactory = relBuilder.getTypeFactory private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuildIns @@ -200,16 +193,21 @@ abstract class TableEnvironment(val config: TableConfig) { "TMP_" + attrNameCntr.getAndIncrement() } - /** Returns the [[RelBuilder]] of this TableEnvironment. */ - private[flink] def getRelBuilder: RelBuilder = { + /** Returns the [[FlinkRelBuilder]] of this TableEnvironment. */ + private[flink] def getRelBuilder: FlinkRelBuilder = { relBuilder } /** Returns the Calcite [[org.apache.calcite.plan.RelOptPlanner]] of this TableEnvironment. */ - protected def getPlanner: RelOptPlanner = { + private[flink] def getPlanner: RelOptPlanner = { planner } + /** Returns the [[FlinkTypeFactory]] of this TableEnvironment. */ + private[flink] def getTypeFactory: FlinkTypeFactory = { + typeFactory + } + private[flink] def getFunctionCatalog: FunctionCatalog = { functionCatalog } http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala index 486ba53..135b6f6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala @@ -29,14 +29,13 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeIn import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig} import org.apache.flink.api.table.codegen.CodeGenUtils._ import org.apache.flink.api.table.codegen.Indenter.toISC import org.apache.flink.api.table.codegen.calls.ScalarFunctions import org.apache.flink.api.table.codegen.calls.ScalarOperators._ import org.apache.flink.api.table.typeutils.RowTypeInfo import org.apache.flink.api.table.typeutils.TypeCheckUtils.{isNumeric, isString, isTemporal} -import org.apache.flink.api.table.typeutils.TypeConverter.sqlTypeToTypeInfo import scala.collection.JavaConversions._ import scala.collection.mutable @@ -535,7 +534,7 @@ class CodeGenerator( override def visitFieldAccess(rexFieldAccess: RexFieldAccess): GeneratedExpression = ??? override def visitLiteral(literal: RexLiteral): GeneratedExpression = { - val resultType = sqlTypeToTypeInfo(literal.getType.getSqlTypeName) + val resultType = FlinkTypeFactory.toTypeInfo(literal.getType) val value = literal.getValue3 // null value with type if (value == null) { @@ -635,7 +634,7 @@ class CodeGenerator( override def visitCall(call: RexCall): GeneratedExpression = { val operands = call.getOperands.map(_.accept(this)) - val resultType = sqlTypeToTypeInfo(call.getType.getSqlTypeName) + val resultType = FlinkTypeFactory.toTypeInfo(call.getType) call.getOperator match { // arithmetic http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala index 7059424..525d010 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala @@ -19,9 +19,9 @@ package org.apache.flink.api.table.expressions import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder - import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.typeutils.{TypeCoercion, TypeConverter} +import org.apache.flink.api.table.FlinkTypeFactory +import org.apache.flink.api.table.typeutils.TypeCoercion import org.apache.flink.api.table.validate._ case class Cast(child: Expression, resultType: TypeInformation[_]) extends UnaryExpression { @@ -29,7 +29,12 @@ case class Cast(child: Expression, resultType: TypeInformation[_]) extends Unary override def toString = s"$child.cast($resultType)" override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { - relBuilder.cast(child.toRexNode, TypeConverter.typeInfoToSqlType(resultType)) + val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory] + relBuilder + .getRexBuilder + .makeCast( + typeFactory.createTypeFromTypeInfo(resultType), + child.toRexNode) } override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = { http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala index e697d0c..cd3de60 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala @@ -17,14 +17,14 @@ */ package org.apache.flink.api.table.expressions -import java.sql.{Timestamp, Time, Date} -import java.util.{TimeZone, Calendar} +import java.sql.{Date, Time, Timestamp} +import java.util.{Calendar, TimeZone} import org.apache.calcite.rex.RexNode import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.tools.RelBuilder -import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, BasicTypeInfo, TypeInformation} -import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.table.FlinkTypeFactory object Literal { private[flink] def apply(l: Any): Literal = l match { @@ -81,6 +81,11 @@ case class Null(resultType: TypeInformation[_]) extends LeafExpression { override def toString = s"null" override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { - relBuilder.getRexBuilder.makeNullLiteral(TypeConverter.typeInfoToSqlType(resultType)) + val rexBuilder = relBuilder.getRexBuilder + val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory] + rexBuilder + .makeCast( + typeFactory.createTypeFromTypeInfo(resultType), + rexBuilder.constantNull()) } } http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala index 70d7724..f3f9412 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala @@ -440,8 +440,7 @@ case class CatalogNode( rowType: RelDataType) extends LeafNode { val output: Seq[Attribute] = rowType.getFieldList.asScala.map { field => - ResolvedFieldReference( - field.getName, TypeConverter.sqlTypeToTypeInfo(field.getType.getSqlTypeName)) + ResolvedFieldReference(field.getName, FlinkTypeFactory.toTypeInfo(field.getType)) } override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { @@ -458,8 +457,7 @@ case class LogicalRelNode( relNode: RelNode) extends LeafNode { val output: Seq[Attribute] = relNode.getRowType.getFieldList.asScala.map { field => - ResolvedFieldReference( - field.getName, TypeConverter.sqlTypeToTypeInfo(field.getType.getSqlTypeName)) + ResolvedFieldReference(field.getName, FlinkTypeFactory.toTypeInfo(field.getType)) } override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala index e71ab6c..0fbac5e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala @@ -28,7 +28,7 @@ import org.apache.flink.api.java.DataSet import org.apache.flink.api.table.runtime.aggregate.AggregateUtil import org.apache.flink.api.table.runtime.aggregate.AggregateUtil.CalcitePair import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter} -import org.apache.flink.api.table.{BatchTableEnvironment, Row} +import org.apache.flink.api.table.{FlinkTypeFactory, BatchTableEnvironment, Row} import scala.collection.JavaConverters._ @@ -100,8 +100,7 @@ class DataSetAggregate( // get the output types val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala - .map(f => f.getType.getSqlTypeName) - .map(n => TypeConverter.sqlTypeToTypeInfo(n)) + .map(field => FlinkTypeFactory.toTypeInfo(field.getType)) .toArray val aggString = aggregationToString http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala index ffc2692..0fb5db9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala @@ -19,6 +19,7 @@ package org.apache.flink.api.table.plan.schema import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.flink.api.table.FlinkTypeFactory import org.apache.flink.streaming.api.datastream.DataStream class DataStreamTable[T]( @@ -28,9 +29,11 @@ class DataStreamTable[T]( extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames) { override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { + val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] val builder = typeFactory.builder fieldNames.zip(fieldTypes) - .foreach( f => builder.add(f._1, f._2).nullable(true) ) + .foreach( f => + builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true) ) builder.build } } http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala index 7024ce2..d95b513 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala @@ -20,11 +20,9 @@ package org.apache.flink.api.table.plan.schema import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} import org.apache.calcite.schema.impl.AbstractTable -import org.apache.calcite.sql.`type`.SqlTypeName -import org.apache.flink.api.common.typeinfo.{TypeInformation, AtomicType} +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.TableException -import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.{FlinkTypeFactory, TableException} abstract class FlinkTable[T]( val typeInfo: TypeInformation[T], @@ -43,7 +41,7 @@ abstract class FlinkTable[T]( "Table field names must be unique.") } - val fieldTypes: Array[SqlTypeName] = + val fieldTypes: Array[TypeInformation[_]] = typeInfo match { case cType: CompositeType[T] => if (fieldNames.length != cType.getArity) { @@ -51,21 +49,23 @@ abstract class FlinkTable[T]( s"Arity of type (" + cType.getFieldNames.deep + ") " + "not equal to number of field names " + fieldNames.deep + ".") } - fieldIndexes - .map(cType.getTypeAt(_)) - .map(TypeConverter.typeInfoToSqlType(_)) + fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]]) case aType: AtomicType[T] => if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) { throw new TableException( "Non-composite input type may have only a single field and its index must be 0.") } - Array(TypeConverter.typeInfoToSqlType(aType)) + Array(aType) } override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { - val builder = typeFactory.builder - fieldNames.zip(fieldTypes) - .foreach( f => builder.add(f._1, f._2).nullable(true) ) + val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory] + val builder = flinkTypeFactory.builder + fieldNames + .zip(fieldTypes) + .foreach { f => + builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true) + } builder.build } http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/GenericRelDataType.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/GenericRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/GenericRelDataType.scala new file mode 100644 index 0000000..a3012d1 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/GenericRelDataType.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.schema + +import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.FlinkTypeSystem + +/** + * Generic type for encapsulating Flink's [[TypeInformation]]. + * + * @param typeInfo TypeInformation to encapsulate + * @param typeSystem Flink's type system + */ +class GenericRelDataType( + val typeInfo: TypeInformation[_], + typeSystem: FlinkTypeSystem) + extends BasicSqlType( + typeSystem, + SqlTypeName.ANY) { + + override def toString = s"ANY($typeInfo)" + + def canEqual(other: Any): Boolean = other.isInstanceOf[GenericRelDataType] + + override def equals(other: Any): Boolean = other match { + case that: GenericRelDataType => + super.equals(that) && + (that canEqual this) && + typeInfo == that.typeInfo + case _ => false + } + + override def hashCode(): Int = { + typeInfo.hashCode() + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala index 44a67b6..65d12c3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala @@ -27,9 +27,8 @@ import org.apache.calcite.sql.`type`.{SqlTypeFactoryImpl, SqlTypeName} import org.apache.calcite.sql.fun._ import org.apache.flink.api.common.functions.{GroupReduceFunction, MapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.typeutils.TypeConverter import org.apache.flink.api.table.typeutils.RowTypeInfo -import org.apache.flink.api.table.{TableException, Row, TableConfig} +import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableConfig, TableException} import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer @@ -250,8 +249,8 @@ object AggregateUtil { // get the field data types of group keys. val groupingTypes: Seq[TypeInformation[_]] = groupings - .map(inputType.getFieldList.get(_).getType.getSqlTypeName) - .map(TypeConverter.sqlTypeToTypeInfo) + .map(inputType.getFieldList.get(_).getType) + .map(FlinkTypeFactory.toTypeInfo) val aggPartialNameSuffix = "agg_buffer_" val factory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT) http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index 0acf0f9..e719782 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -17,16 +17,15 @@ */ package org.apache.flink.api.table -import scala.collection.JavaConverters._ import org.apache.calcite.rel.RelNode import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.table.plan.RexNodeTranslator.extractAggregations import org.apache.flink.api.java.operators.join.JoinType -import org.apache.flink.api.table.expressions._ -import org.apache.flink.api.table.plan.logical +import org.apache.flink.api.table.expressions.{Asc, ExpressionParser, UnresolvedAlias, Expression, Ordering} +import org.apache.flink.api.table.plan.RexNodeTranslator.extractAggregations import org.apache.flink.api.table.plan.logical._ import org.apache.flink.api.table.sinks.TableSink -import org.apache.flink.api.table.typeutils.TypeConverter + +import scala.collection.JavaConverters._ /** * A Table is the core component of the Table API. @@ -423,7 +422,7 @@ class Table( throw new ValidationException("Only tables from the same TableEnvironment can be " + "subtracted.") } - new Table(tableEnv, logical.Minus(logicalPlan, right.logicalPlan, all = false) + new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = false) .validate(tableEnv)) } @@ -448,7 +447,7 @@ class Table( throw new ValidationException("Only tables from the same TableEnvironment can be " + "subtracted.") } - new Table(tableEnv, logical.Minus(logicalPlan, right.logicalPlan, all = true) + new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = true) .validate(tableEnv)) } @@ -598,7 +597,7 @@ class Table( val rowType = getRelNode.getRowType val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala - .map(f => TypeConverter.sqlTypeToTypeInfo(f.getType.getSqlTypeName)).toArray + .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray // configure the table sink val configuredSink = sink.configure(fieldNames, fieldTypes) http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala index 0c29901..c19deec 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala @@ -18,11 +18,27 @@ package org.apache.flink.api.table.typeutils import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{BIG_DEC_TYPE_INFO, BOOLEAN_TYPE_INFO, STRING_TYPE_INFO} -import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, NumericTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, NumericTypeInfo, TypeInformation} import org.apache.flink.api.table.validate._ object TypeCheckUtils { + /** + * Checks if type information is an advanced type that can be converted to a + * SQL type but NOT vice versa. + */ + def isAdvanced(dataType: TypeInformation[_]): Boolean = dataType match { + case _: BasicTypeInfo[_] => false + case _: SqlTimeTypeInfo[_] => false + case _ => true + } + + /** + * Checks if type information is a simple type that can be converted to a + * SQL type and vice versa. + */ + def isSimple(dataType: TypeInformation[_]): Boolean = !isAdvanced(dataType) + def isNumeric(dataType: TypeInformation[_]): Boolean = dataType match { case _: NumericTypeInfo[_] => true case BIG_DEC_TYPE_INFO => true http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala index 95e50d5..ea1f7ce 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala @@ -21,16 +21,12 @@ package org.apache.flink.api.table.typeutils import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.JoinRelType import org.apache.calcite.rel.core.JoinRelType._ -import org.apache.calcite.sql.`type`.SqlTypeName -import org.apache.calcite.sql.`type`.SqlTypeName._ -import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ -import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, AtomicType, TypeInformation} +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.operators.join.JoinType import org.apache.flink.api.java.tuple.Tuple -import org.apache.flink.api.java.typeutils.ValueTypeInfo._ import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} -import org.apache.flink.api.table.{Row, TableException} +import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableException} import scala.collection.JavaConversions._ @@ -38,59 +34,6 @@ object TypeConverter { val DEFAULT_ROW_TYPE = new RowTypeInfo(Seq(), Seq()).asInstanceOf[TypeInformation[Any]] - def typeInfoToSqlType(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo match { - case BOOLEAN_TYPE_INFO => BOOLEAN - case BYTE_TYPE_INFO => TINYINT - case SHORT_TYPE_INFO => SMALLINT - case INT_TYPE_INFO => INTEGER - case LONG_TYPE_INFO => BIGINT - case FLOAT_TYPE_INFO => FLOAT - case DOUBLE_TYPE_INFO => DOUBLE - case STRING_TYPE_INFO => VARCHAR - case BIG_DEC_TYPE_INFO => DECIMAL - - // date/time types - case SqlTimeTypeInfo.DATE => DATE - case SqlTimeTypeInfo.TIME => TIME - case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP - - case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO => - throw new TableException("Character type is not supported.") - - case t@_ => - throw new TableException(s"Type is not supported: $t") - } - - def sqlTypeToTypeInfo(sqlType: SqlTypeName): TypeInformation[_] = sqlType match { - case BOOLEAN => BOOLEAN_TYPE_INFO - case TINYINT => BYTE_TYPE_INFO - case SMALLINT => SHORT_TYPE_INFO - case INTEGER => INT_TYPE_INFO - case BIGINT => LONG_TYPE_INFO - case FLOAT => FLOAT_TYPE_INFO - case DOUBLE => DOUBLE_TYPE_INFO - case VARCHAR | CHAR => STRING_TYPE_INFO - case DECIMAL => BIG_DEC_TYPE_INFO - - // date/time types - case DATE => SqlTimeTypeInfo.DATE - case TIME => SqlTimeTypeInfo.TIME - case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP - case INTERVAL_DAY_TIME | INTERVAL_YEAR_MONTH => - throw new TableException("Intervals are not supported yet.") - - case NULL => - throw new TableException("Type NULL is not supported. " + - "Null values must have a supported type.") - - // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING - // are represented as integer - case SYMBOL => INT_TYPE_INFO - - case _ => - throw new TableException("Type " + sqlType.toString + " is not supported.") - } - /** * Determines the return type of Flink operators based on the logical fields, the expected * physical type and configuration parameters. @@ -117,7 +60,7 @@ object TypeConverter { : TypeInformation[Any] = { // convert to type information val logicalFieldTypes = logicalRowType.getFieldList map { relDataType => - TypeConverter.sqlTypeToTypeInfo(relDataType.getType.getSqlTypeName) + FlinkTypeFactory.toTypeInfo(relDataType.getType) } // field names val logicalFieldNames = logicalRowType.getFieldNames.toList http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java index ecd916f..af96a04 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.batch.table; +import java.util.HashMap; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple3; @@ -221,6 +222,38 @@ public class FromDataSetITCase extends TableProgramsTestBase { compareResultAsText(results, expected); } + @Test + public void testAsWithPojoAndGenericTypes() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); + + List<PojoWithGeneric> data = new ArrayList<>(); + data.add(new PojoWithGeneric("Peter", 28, new HashMap<String, String>(), new ArrayList<String>())); + HashMap<String, String> hm1 = new HashMap<>(); + hm1.put("test1", "test1"); + data.add(new PojoWithGeneric("Anna", 56, hm1, new ArrayList<String>())); + HashMap<String, String> hm2 = new HashMap<>(); + hm2.put("abc", "cde"); + data.add(new PojoWithGeneric("Lucy", 42, hm2, new ArrayList<String>())); + + Table table = tableEnv + .fromDataSet(env.fromCollection(data), + "name AS a, " + + "age AS b, " + + "generic AS c, " + + "generic2 AS d") + .select("a, b, c, c as c2, d") + .select("a, b, c, c === c2, d"); + + DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); + List<Row> results = ds.collect(); + String expected = + "Peter,28,{},true,[]\n" + + "Anna,56,{test1=test1},true,[]\n" + + "Lucy,42,{abc=cde},true,[]\n"; + compareResultAsText(results, expected); + } + @Test(expected = TableException.class) public void testAsWithToFewFields() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -287,6 +320,31 @@ public class FromDataSetITCase extends TableProgramsTestBase { } @SuppressWarnings("unused") + public static class PojoWithGeneric { + public String name; + public int age; + public HashMap<String, String> generic; + public ArrayList<String> generic2; + + public PojoWithGeneric() { + // default constructor + } + + public PojoWithGeneric(String name, int age, HashMap<String, String> generic, + ArrayList<String> generic2) { + this.name = name; + this.age = age; + this.generic = generic; + this.generic2 = generic2; + } + + @Override + public String toString() { + return name + "," + age + "," + generic + "," + generic2; + } + } + + @SuppressWarnings("unused") public static class PrivateSmallPojo { public PrivateSmallPojo() { } http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java index 4161b1e..1743981 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java @@ -53,7 +53,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase { resultSet.addSink(new StreamITCase.StringSink()); env.execute(); - List<String> expected = new ArrayList(); + List<String> expected = new ArrayList<>(); expected.add("1,1,Hi"); expected.add("2,2,Hello"); expected.add("3,2,Hello world"); @@ -77,7 +77,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase { resultSet.addSink(new StreamITCase.StringSink()); env.execute(); - List<String> expected = new ArrayList(); + List<String> expected = new ArrayList<>(); expected.add("1,1,1"); expected.add("2,2,2"); expected.add("2,3,1"); @@ -108,7 +108,7 @@ public class SqlITCase extends StreamingMultipleProgramsTestBase { resultSet.addSink(new StreamITCase.StringSink()); env.execute(); - List<String> expected = new ArrayList(); + List<String> expected = new ArrayList<>(); expected.add("1,1,Hi"); expected.add("2,2,Hello"); expected.add("3,2,Hello world"); http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala index 4345dd8..32b0e3a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala @@ -21,17 +21,17 @@ package org.apache.flink.api.table.expressions.utils import org.apache.calcite.rel.logical.LogicalProject import org.apache.calcite.rex.RexNode import org.apache.calcite.sql.`type`.SqlTypeName._ -import org.apache.calcite.tools.{Frameworks, RelBuilder} +import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.functions.{Function, MapFunction} import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.{DataSet => JDataSet} import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} +import org.apache.flink.api.table._ import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedFunction} import org.apache.flink.api.table.expressions.{Expression, ExpressionParser} import org.apache.flink.api.table.runtime.FunctionCompiler import org.apache.flink.api.table.typeutils.RowTypeInfo -import org.apache.flink.api.table.{BatchTableEnvironment, Row, TableConfig, TableEnvironment} import org.junit.Assert._ import org.junit.{After, Before} import org.mockito.Mockito._ @@ -48,7 +48,10 @@ abstract class ExpressionTestBase { // setup test utils private val tableName = "testTable" private val context = prepareContext(typeInfo) - private val planner = Frameworks.getPlanner(context._2.getFrameworkConfig) + private val planner = new FlinkPlannerImpl( + context._2.getFrameworkConfig, + context._2.getPlanner, + context._2.getTypeFactory) private def prepareContext(typeInfo: TypeInformation[Any]): (RelBuilder, TableEnvironment) = { // create DataSetTable @@ -128,8 +131,6 @@ abstract class ExpressionTestBase { // extract RexNode val expr: RexNode = converted.rel.asInstanceOf[LogicalProject].getChildExps.get(0) testExprs.add((expr, expected)) - - planner.close() } private def addTableApiTestExpr(tableApiExpr: Expression, expected: String): Unit = {