Repository: flink
Updated Branches:
  refs/heads/master 971dcc5de -> 1b327f1ae


[FLINK-3916] [table] Allow generic types passing the Table API

This closes #2197.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b327f1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b327f1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b327f1a

Branch: refs/heads/master
Commit: 1b327f1ae7d078e22700729524e374b449b0f209
Parents: 971dcc5
Author: twalthr <twal...@apache.org>
Authored: Tue May 17 16:14:29 2016 +0200
Committer: twalthr <twal...@apache.org>
Committed: Tue Jul 12 15:33:11 2016 +0200

----------------------------------------------------------------------
 .../flink/api/table/BatchTableEnvironment.scala |   2 +-
 .../flink/api/table/FlinkPlannerImpl.scala      |  44 +++----
 .../flink/api/table/FlinkRelBuilder.scala       |  87 +++++++++++++
 .../flink/api/table/FlinkTypeFactory.scala      | 124 +++++++++++++++++++
 .../api/table/StreamTableEnvironment.scala      |   2 +-
 .../flink/api/table/TableEnvironment.scala      |  40 +++---
 .../flink/api/table/codegen/CodeGenerator.scala |   7 +-
 .../flink/api/table/expressions/cast.scala      |  11 +-
 .../flink/api/table/expressions/literals.scala  |  15 ++-
 .../api/table/plan/logical/operators.scala      |   6 +-
 .../plan/nodes/dataset/DataSetAggregate.scala   |   5 +-
 .../api/table/plan/schema/DataStreamTable.scala |   5 +-
 .../api/table/plan/schema/FlinkTable.scala      |  24 ++--
 .../table/plan/schema/GenericRelDataType.scala  |  53 ++++++++
 .../table/runtime/aggregate/AggregateUtil.scala |   7 +-
 .../org/apache/flink/api/table/table.scala      |  15 ++-
 .../api/table/typeutils/TypeCheckUtils.scala    |  18 ++-
 .../api/table/typeutils/TypeConverter.scala     |  63 +---------
 .../api/java/batch/table/FromDataSetITCase.java |  58 +++++++++
 .../flink/api/java/stream/sql/SqlITCase.java    |   6 +-
 .../expressions/utils/ExpressionTestBase.scala  |  11 +-
 21 files changed, 439 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
index b1d5534..1ba13be 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
@@ -123,7 +123,7 @@ abstract class BatchTableEnvironment(
     */
   override def sql(query: String): Table = {
 
-    val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner)
+    val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
     // parse the sql query
     val parsed = planner.parse(query)
     // validate the sql query

http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
index 9d0a146..f016d57 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala
@@ -21,7 +21,6 @@ package org.apache.flink.api.table
 import java.util
 
 import com.google.common.collect.ImmutableList
-import org.apache.calcite.adapter.java.JavaTypeFactory
 import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.plan.RelOptTable.ViewExpander
 import org.apache.calcite.plan._
@@ -30,47 +29,37 @@ import org.apache.calcite.rel.RelRoot
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex.RexBuilder
 import org.apache.calcite.schema.SchemaPlus
-import org.apache.calcite.sql.parser.{SqlParser, SqlParseException}
+import org.apache.calcite.sql.parser.{SqlParseException, SqlParser}
 import org.apache.calcite.sql.validate.SqlValidator
 import org.apache.calcite.sql.{SqlNode, SqlOperatorTable}
-import org.apache.calcite.sql2rel.{RelDecorrelator, SqlToRelConverter, 
SqlRexConvertletTable}
-import org.apache.calcite.tools.{RelConversionException, ValidationException 
=> CValidationException, Frameworks, FrameworkConfig}
-import org.apache.calcite.util.Util
+import org.apache.calcite.sql2rel.{RelDecorrelator, SqlRexConvertletTable, 
SqlToRelConverter}
+import org.apache.calcite.tools.{FrameworkConfig, RelConversionException, 
ValidationException => CValidationException}
+
 import scala.collection.JavaConversions._
 
-/** NOTE: this is heavily insipred by Calcite's PlannerImpl.
-  We need it in order to share the planner between the Table API relational 
plans
-  and the SQL relation plans that are created by the Calcite parser.
-  The only difference is that we initialize the RelOptPlanner planner
-  when instantiating, instead of creating a new one in the ready() method. **/
-class FlinkPlannerImpl(config: FrameworkConfig, var planner: RelOptPlanner) {
+/**
+  * NOTE: this is heavily inspired by Calcite's PlannerImpl.
+  * We need it in order to share the planner between the Table API relational 
plans
+  * and the SQL relation plans that are created by the Calcite parser.
+  * The main difference is that we do not create a new RelOptPlanner in the 
ready() method.
+  */
+class FlinkPlannerImpl(
+    config: FrameworkConfig,
+    planner: RelOptPlanner,
+    typeFactory: FlinkTypeFactory) {
 
   val operatorTable: SqlOperatorTable = config.getOperatorTable
   /** Holds the trait definitions to be registered with planner. May be null. 
*/
   val traitDefs: ImmutableList[RelTraitDef[_ <: RelTrait]] = 
config.getTraitDefs
   val parserConfig: SqlParser.Config = config.getParserConfig
   val convertletTable: SqlRexConvertletTable = config.getConvertletTable
-  var defaultSchema: SchemaPlus = config.getDefaultSchema
+  val defaultSchema: SchemaPlus = config.getDefaultSchema
 
-  var typeFactory: JavaTypeFactory = null
   var validator: FlinkCalciteSqlValidator = null
   var validatedSqlNode: SqlNode = null
   var root: RelRoot = null
 
   private def ready() {
-    Frameworks.withPlanner(new Frameworks.PlannerAction[Unit] {
-      def apply(
-          cluster: RelOptCluster,
-          relOptSchema: RelOptSchema,
-          rootSchema: SchemaPlus): Unit = {
-
-        Util.discard(rootSchema)
-        typeFactory = cluster.getTypeFactory.asInstanceOf[JavaTypeFactory]
-        if (planner == null) {
-          planner = cluster.getPlanner
-        }
-      }
-    }, config)
     if (this.traitDefs != null) {
       planner.clearRelTraitDefs()
       for (traitDef <- this.traitDefs) {
@@ -95,9 +84,8 @@ class FlinkPlannerImpl(config: FrameworkConfig, var planner: 
RelOptPlanner) {
       validatedSqlNode = validator.validate(sqlNode)
     }
     catch {
-      case e: RuntimeException => {
+      case e: RuntimeException =>
         throw new CValidationException(e)
-      }
     }
     validatedSqlNode
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
new file mode 100644
index 0000000..e3bb97e
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.calcite.jdbc.CalciteSchema
+import org.apache.calcite.plan.{Context, RelOptCluster, RelOptSchema}
+import org.apache.calcite.prepare.CalciteCatalogReader
+import org.apache.calcite.rex.RexBuilder
+import org.apache.calcite.schema.SchemaPlus
+import org.apache.calcite.tools.Frameworks.PlannerAction
+import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder}
+
+/**
+  * Flink specific [[RelBuilder]] that changes the default type factory to a 
[[FlinkTypeFactory]].
+  */
+class FlinkRelBuilder(
+    context: Context,
+    cluster: RelOptCluster,
+    relOptSchema: RelOptSchema)
+  extends RelBuilder(
+    context,
+    cluster,
+    relOptSchema) {
+
+  def getPlanner = cluster.getPlanner
+
+  def getCluster = cluster
+
+  override def getTypeFactory: FlinkTypeFactory =
+    super.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+}
+
+object FlinkRelBuilder {
+
+  def create(config: FrameworkConfig): FlinkRelBuilder = {
+    // prepare planner and collect context instances
+    val clusters: Array[RelOptCluster] = Array(null)
+    val relOptSchemas: Array[RelOptSchema] = Array(null)
+    val rootSchemas: Array[SchemaPlus] = Array(null)
+    Frameworks.withPlanner(new PlannerAction[Void] {
+      override def apply(
+          cluster: RelOptCluster,
+          relOptSchema: RelOptSchema,
+          rootSchema: SchemaPlus)
+        : Void = {
+        clusters(0) = cluster
+        relOptSchemas(0) = relOptSchema
+        rootSchemas(0) = rootSchema
+        null
+      }
+    })
+    val planner = clusters(0).getPlanner
+    val defaultRelOptSchema = 
relOptSchemas(0).asInstanceOf[CalciteCatalogReader]
+
+    // create Flink type factory
+    val typeSystem = config.getTypeSystem
+    val typeFactory = new FlinkTypeFactory(typeSystem)
+
+    // create context instances with Flink type factory
+    val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory))
+    val calciteSchema = CalciteSchema.from(config.getDefaultSchema)
+    val relOptSchema = new CalciteCatalogReader(
+      calciteSchema,
+      config.getParserConfig.caseSensitive(),
+      defaultRelOptSchema.getSchemaName,
+      typeFactory)
+
+    new FlinkRelBuilder(config.getContext, cluster, relOptSchema)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
new file mode 100644
index 0000000..6a31487
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.ValueTypeInfo._
+import org.apache.flink.api.table.FlinkTypeFactory.typeInfoToSqlTypeName
+import org.apache.flink.api.table.plan.schema.GenericRelDataType
+import org.apache.flink.api.table.typeutils.TypeCheckUtils.isSimple
+
+import scala.collection.mutable
+
+/**
+  * Flink specific type factory that represents the interface between Flink's 
[[TypeInformation]]
+  * and Calcite's [[RelDataType]].
+  */
+class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends 
JavaTypeFactoryImpl(typeSystem) {
+
+  private val seenTypes = mutable.HashMap[TypeInformation[_], RelDataType]()
+
+  def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType = {
+    // simple type can be converted to SQL types and vice versa
+    if (isSimple(typeInfo)) {
+      createSqlType(typeInfoToSqlTypeName(typeInfo))
+    }
+    // advanced types require specific RelDataType
+    // for storing the original TypeInformation
+    else {
+      seenTypes.getOrElseUpdate(typeInfo, 
canonize(createAdvancedType(typeInfo)))
+    }
+  }
+
+  private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = 
typeInfo match {
+    // TODO add specific RelDataTypes
+    // for PrimitiveArrayTypeInfo, ObjectArrayTypeInfo, CompositeType
+    case ti: TypeInformation[_] =>
+      new GenericRelDataType(typeInfo, 
getTypeSystem.asInstanceOf[FlinkTypeSystem])
+
+    case ti@_ =>
+      throw new TableException(s"Unsupported type information: $ti")
+  }
+}
+
+object FlinkTypeFactory {
+
+  private def typeInfoToSqlTypeName(typeInfo: TypeInformation[_]): SqlTypeName 
= typeInfo match {
+      case BOOLEAN_TYPE_INFO => BOOLEAN
+      case BYTE_TYPE_INFO => TINYINT
+      case SHORT_TYPE_INFO => SMALLINT
+      case INT_TYPE_INFO => INTEGER
+      case LONG_TYPE_INFO => BIGINT
+      case FLOAT_TYPE_INFO => FLOAT
+      case DOUBLE_TYPE_INFO => DOUBLE
+      case STRING_TYPE_INFO => VARCHAR
+      case BIG_DEC_TYPE_INFO => DECIMAL
+
+      // date/time types
+      case SqlTimeTypeInfo.DATE => DATE
+      case SqlTimeTypeInfo.TIME => TIME
+      case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP
+
+      case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO =>
+        throw new TableException("Character type is not supported.")
+
+      case _@t =>
+        throw new TableException(s"Type is not supported: $t")
+  }
+
+  def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = 
relDataType.getSqlTypeName match {
+    case BOOLEAN => BOOLEAN_TYPE_INFO
+    case TINYINT => BYTE_TYPE_INFO
+    case SMALLINT => SHORT_TYPE_INFO
+    case INTEGER => INT_TYPE_INFO
+    case BIGINT => LONG_TYPE_INFO
+    case FLOAT => FLOAT_TYPE_INFO
+    case DOUBLE => DOUBLE_TYPE_INFO
+    case VARCHAR | CHAR => STRING_TYPE_INFO
+    case DECIMAL => BIG_DEC_TYPE_INFO
+
+    // date/time types
+    case DATE => SqlTimeTypeInfo.DATE
+    case TIME => SqlTimeTypeInfo.TIME
+    case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP
+    case INTERVAL_DAY_TIME | INTERVAL_YEAR_MONTH =>
+      throw new TableException("Intervals are not supported yet.")
+
+    case NULL =>
+      throw new TableException("Type NULL is not supported. " +
+        "Null values must have a supported type.")
+
+    // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
+    // are represented as integer
+    case SYMBOL => INT_TYPE_INFO
+
+    // extract encapsulated TypeInformation
+    case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
+      val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
+      genericRelDataType.typeInfo
+
+    case _@t =>
+      throw new TableException(s"Type is not supported: $t")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index daa74da..4f57ae9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -124,7 +124,7 @@ abstract class StreamTableEnvironment(
     */
   override def sql(query: String): Table = {
 
-    val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner)
+    val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
     // parse the sql query
     val parsed = planner.parse(query)
     // validate the sql query

http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index 0f6cb24..73dbbaa 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -21,26 +21,23 @@ package org.apache.flink.api.table
 import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.calcite.config.Lex
-import org.apache.calcite.plan.{RelOptCluster, RelOptPlanner}
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.plan.RelOptPlanner
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.schema.SchemaPlus
 import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.sql.parser.SqlParser
-import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder}
-
+import org.apache.calcite.tools.{FrameworkConfig, Frameworks}
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
-import org.apache.flink.api.java.table.{BatchTableEnvironment => 
JavaBatchTableEnv}
-import org.apache.flink.api.java.table.{StreamTableEnvironment => 
JavaStreamTableEnv}
+import org.apache.flink.api.java.table.{BatchTableEnvironment => 
JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
-import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
-import org.apache.flink.api.scala.table.{BatchTableEnvironment => 
ScalaBatchTableEnv}
-import org.apache.flink.api.scala.table.{StreamTableEnvironment => 
ScalaStreamTableEnv}
+import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
+import org.apache.flink.api.scala.table.{BatchTableEnvironment => 
ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
 import org.apache.flink.api.table.expressions.{Alias, Expression, 
UnresolvedFieldReference}
 import org.apache.flink.api.table.plan.cost.DataSetCostFactory
-import org.apache.flink.api.table.sinks.TableSink
 import org.apache.flink.api.table.plan.schema.{RelTable, TransStreamTable}
+import org.apache.flink.api.table.sinks.TableSink
 import org.apache.flink.api.table.validate.FunctionCatalog
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment 
=> JavaStreamExecEnv}
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => 
ScalaStreamExecEnv}
@@ -73,16 +70,12 @@ abstract class TableEnvironment(val config: TableConfig) {
     .build
 
   // the builder for Calcite RelNodes, Calcite's representation of a 
relational expression tree.
-  protected val relBuilder: RelBuilder = RelBuilder.create(frameworkConfig)
-
-  private val cluster: RelOptCluster = relBuilder
-    .values(Array("dummy"), new Integer(1))
-    .build().getCluster
+  protected val relBuilder: FlinkRelBuilder = 
FlinkRelBuilder.create(frameworkConfig)
 
   // the planner instance used to optimize queries of this TableEnvironment
-  private val planner: RelOptPlanner = cluster.getPlanner
+  private val planner: RelOptPlanner = relBuilder.getPlanner
 
-  private val typeFactory: RelDataTypeFactory = cluster.getTypeFactory
+  private val typeFactory: FlinkTypeFactory = relBuilder.getTypeFactory
 
   private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuildIns
 
@@ -200,16 +193,21 @@ abstract class TableEnvironment(val config: TableConfig) {
     "TMP_" + attrNameCntr.getAndIncrement()
   }
 
-  /** Returns the [[RelBuilder]] of this TableEnvironment. */
-  private[flink] def getRelBuilder: RelBuilder = {
+  /** Returns the [[FlinkRelBuilder]] of this TableEnvironment. */
+  private[flink] def getRelBuilder: FlinkRelBuilder = {
     relBuilder
   }
 
   /** Returns the Calcite [[org.apache.calcite.plan.RelOptPlanner]] of this 
TableEnvironment. */
-  protected def getPlanner: RelOptPlanner = {
+  private[flink] def getPlanner: RelOptPlanner = {
     planner
   }
 
+  /** Returns the [[FlinkTypeFactory]] of this TableEnvironment. */
+  private[flink] def getTypeFactory: FlinkTypeFactory = {
+    typeFactory
+  }
+
   private[flink] def getFunctionCatalog: FunctionCatalog = {
     functionCatalog
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index 486ba53..135b6f6 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -29,14 +29,13 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, 
SqlTimeTypeInfo, TypeIn
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
 import org.apache.flink.api.table.codegen.CodeGenUtils._
 import org.apache.flink.api.table.codegen.Indenter.toISC
 import org.apache.flink.api.table.codegen.calls.ScalarFunctions
 import org.apache.flink.api.table.codegen.calls.ScalarOperators._
 import org.apache.flink.api.table.typeutils.RowTypeInfo
 import org.apache.flink.api.table.typeutils.TypeCheckUtils.{isNumeric, 
isString, isTemporal}
-import org.apache.flink.api.table.typeutils.TypeConverter.sqlTypeToTypeInfo
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
@@ -535,7 +534,7 @@ class CodeGenerator(
   override def visitFieldAccess(rexFieldAccess: RexFieldAccess): 
GeneratedExpression = ???
 
   override def visitLiteral(literal: RexLiteral): GeneratedExpression = {
-    val resultType = sqlTypeToTypeInfo(literal.getType.getSqlTypeName)
+    val resultType = FlinkTypeFactory.toTypeInfo(literal.getType)
     val value = literal.getValue3
     // null value with type
     if (value == null) {
@@ -635,7 +634,7 @@ class CodeGenerator(
 
   override def visitCall(call: RexCall): GeneratedExpression = {
     val operands = call.getOperands.map(_.accept(this))
-    val resultType = sqlTypeToTypeInfo(call.getType.getSqlTypeName)
+    val resultType = FlinkTypeFactory.toTypeInfo(call.getType)
 
     call.getOperator match {
       // arithmetic

http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
index 7059424..525d010 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
@@ -19,9 +19,9 @@ package org.apache.flink.api.table.expressions
 
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
-
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.typeutils.{TypeCoercion, TypeConverter}
+import org.apache.flink.api.table.FlinkTypeFactory
+import org.apache.flink.api.table.typeutils.TypeCoercion
 import org.apache.flink.api.table.validate._
 
 case class Cast(child: Expression, resultType: TypeInformation[_]) extends 
UnaryExpression {
@@ -29,7 +29,12 @@ case class Cast(child: Expression, resultType: 
TypeInformation[_]) extends Unary
   override def toString = s"$child.cast($resultType)"
 
   override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
-    relBuilder.cast(child.toRexNode, 
TypeConverter.typeInfoToSqlType(resultType))
+    val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    relBuilder
+      .getRexBuilder
+      .makeCast(
+        typeFactory.createTypeFromTypeInfo(resultType),
+        child.toRexNode)
   }
 
   override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {

http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
index e697d0c..cd3de60 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
@@ -17,14 +17,14 @@
  */
 package org.apache.flink.api.table.expressions
 
-import java.sql.{Timestamp, Time, Date}
-import java.util.{TimeZone, Calendar}
+import java.sql.{Date, Time, Timestamp}
+import java.util.{Calendar, TimeZone}
 
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, BasicTypeInfo, 
TypeInformation}
-import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, 
TypeInformation}
+import org.apache.flink.api.table.FlinkTypeFactory
 
 object Literal {
   private[flink] def apply(l: Any): Literal = l match {
@@ -81,6 +81,11 @@ case class Null(resultType: TypeInformation[_]) extends 
LeafExpression {
   override def toString = s"null"
 
   override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
-    
relBuilder.getRexBuilder.makeNullLiteral(TypeConverter.typeInfoToSqlType(resultType))
+    val rexBuilder = relBuilder.getRexBuilder
+    val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    rexBuilder
+      .makeCast(
+        typeFactory.createTypeFromTypeInfo(resultType),
+        rexBuilder.constantNull())
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
index 70d7724..f3f9412 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -440,8 +440,7 @@ case class CatalogNode(
     rowType: RelDataType) extends LeafNode {
 
   val output: Seq[Attribute] = rowType.getFieldList.asScala.map { field =>
-    ResolvedFieldReference(
-      field.getName, 
TypeConverter.sqlTypeToTypeInfo(field.getType.getSqlTypeName))
+    ResolvedFieldReference(field.getName, 
FlinkTypeFactory.toTypeInfo(field.getType))
   }
 
   override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {
@@ -458,8 +457,7 @@ case class LogicalRelNode(
     relNode: RelNode) extends LeafNode {
 
   val output: Seq[Attribute] = relNode.getRowType.getFieldList.asScala.map { 
field =>
-    ResolvedFieldReference(
-      field.getName, 
TypeConverter.sqlTypeToTypeInfo(field.getType.getSqlTypeName))
+    ResolvedFieldReference(field.getName, 
FlinkTypeFactory.toTypeInfo(field.getType))
   }
 
   override protected[logical] def construct(relBuilder: RelBuilder): 
RelBuilder = {

http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
index e71ab6c..0fbac5e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.runtime.aggregate.AggregateUtil
 import org.apache.flink.api.table.runtime.aggregate.AggregateUtil.CalcitePair
 import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter}
-import org.apache.flink.api.table.{BatchTableEnvironment, Row}
+import org.apache.flink.api.table.{FlinkTypeFactory, BatchTableEnvironment, 
Row}
 
 import scala.collection.JavaConverters._
 
@@ -100,8 +100,7 @@ class DataSetAggregate(
 
     // get the output types
     val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
-    .map(f => f.getType.getSqlTypeName)
-    .map(n => TypeConverter.sqlTypeToTypeInfo(n))
+    .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
     .toArray
 
     val aggString = aggregationToString

http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
index ffc2692..0fb5db9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.api.table.plan.schema
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.flink.api.table.FlinkTypeFactory
 import org.apache.flink.streaming.api.datastream.DataStream
 
 class DataStreamTable[T](
@@ -28,9 +29,11 @@ class DataStreamTable[T](
   extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames) {
 
   override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+    val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
     val builder = typeFactory.builder
     fieldNames.zip(fieldTypes)
-      .foreach( f => builder.add(f._1, f._2).nullable(true) )
+      .foreach( f =>
+        builder.add(f._1, 
flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true) )
     builder.build
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala
index 7024ce2..d95b513 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/FlinkTable.scala
@@ -20,11 +20,9 @@ package org.apache.flink.api.table.plan.schema
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
 import org.apache.calcite.schema.impl.AbstractTable
-import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.flink.api.common.typeinfo.{TypeInformation, AtomicType}
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.TableException
-import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.{FlinkTypeFactory, TableException}
 
 abstract class FlinkTable[T](
     val typeInfo: TypeInformation[T],
@@ -43,7 +41,7 @@ abstract class FlinkTable[T](
       "Table field names must be unique.")
   }
 
-  val fieldTypes: Array[SqlTypeName] =
+  val fieldTypes: Array[TypeInformation[_]] =
     typeInfo match {
       case cType: CompositeType[T] =>
         if (fieldNames.length != cType.getArity) {
@@ -51,21 +49,23 @@ abstract class FlinkTable[T](
           s"Arity of type (" + cType.getFieldNames.deep + ") " +
             "not equal to number of field names " + fieldNames.deep + ".")
         }
-        fieldIndexes
-          .map(cType.getTypeAt(_))
-          .map(TypeConverter.typeInfoToSqlType(_))
+        fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]])
       case aType: AtomicType[T] =>
         if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) {
           throw new TableException(
             "Non-composite input type may have only a single field and its 
index must be 0.")
         }
-        Array(TypeConverter.typeInfoToSqlType(aType))
+        Array(aType)
     }
 
   override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
-    val builder = typeFactory.builder
-    fieldNames.zip(fieldTypes)
-      .foreach( f => builder.add(f._1, f._2).nullable(true) )
+    val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
+    val builder = flinkTypeFactory.builder
+    fieldNames
+      .zip(fieldTypes)
+      .foreach { f =>
+        builder.add(f._1, 
flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true)
+      }
     builder.build
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/GenericRelDataType.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/GenericRelDataType.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/GenericRelDataType.scala
new file mode 100644
index 0000000..a3012d1
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/GenericRelDataType.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.schema
+
+import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.FlinkTypeSystem
+
+/**
+  * Generic type for encapsulating Flink's [[TypeInformation]].
+  *
+  * @param typeInfo TypeInformation to encapsulate
+  * @param typeSystem Flink's type system
+  */
+class GenericRelDataType(
+    val typeInfo: TypeInformation[_],
+    typeSystem: FlinkTypeSystem)
+  extends BasicSqlType(
+    typeSystem,
+    SqlTypeName.ANY) {
+
+  override def toString = s"ANY($typeInfo)"
+
+  def canEqual(other: Any): Boolean = other.isInstanceOf[GenericRelDataType]
+
+  override def equals(other: Any): Boolean = other match {
+    case that: GenericRelDataType =>
+      super.equals(that) &&
+        (that canEqual this) &&
+        typeInfo == that.typeInfo
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    typeInfo.hashCode()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
index 44a67b6..65d12c3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
@@ -27,9 +27,8 @@ import org.apache.calcite.sql.`type`.{SqlTypeFactoryImpl, 
SqlTypeName}
 import org.apache.calcite.sql.fun._
 import org.apache.flink.api.common.functions.{GroupReduceFunction, MapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.typeutils.TypeConverter
 import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.api.table.{TableException, Row, TableConfig}
+import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableConfig, 
TableException}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
@@ -250,8 +249,8 @@ object AggregateUtil {
 
     // get the field data types of group keys.
     val groupingTypes: Seq[TypeInformation[_]] = groupings
-      .map(inputType.getFieldList.get(_).getType.getSqlTypeName)
-      .map(TypeConverter.sqlTypeToTypeInfo)
+      .map(inputType.getFieldList.get(_).getType)
+      .map(FlinkTypeFactory.toTypeInfo)
 
     val aggPartialNameSuffix = "agg_buffer_"
     val factory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT)

http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index 0acf0f9..e719782 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -17,16 +17,15 @@
  */
 package org.apache.flink.api.table
 
-import scala.collection.JavaConverters._
 import org.apache.calcite.rel.RelNode
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.plan.RexNodeTranslator.extractAggregations
 import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.plan.logical
+import org.apache.flink.api.table.expressions.{Asc, ExpressionParser, 
UnresolvedAlias, Expression, Ordering}
+import org.apache.flink.api.table.plan.RexNodeTranslator.extractAggregations
 import org.apache.flink.api.table.plan.logical._
 import org.apache.flink.api.table.sinks.TableSink
-import org.apache.flink.api.table.typeutils.TypeConverter
+
+import scala.collection.JavaConverters._
 
 /**
   * A Table is the core component of the Table API.
@@ -423,7 +422,7 @@ class Table(
       throw new ValidationException("Only tables from the same 
TableEnvironment can be " +
         "subtracted.")
     }
-    new Table(tableEnv, logical.Minus(logicalPlan, right.logicalPlan, all = 
false)
+    new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = false)
       .validate(tableEnv))
   }
 
@@ -448,7 +447,7 @@ class Table(
       throw new ValidationException("Only tables from the same 
TableEnvironment can be " +
         "subtracted.")
     }
-    new Table(tableEnv, logical.Minus(logicalPlan, right.logicalPlan, all = 
true)
+    new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = true)
       .validate(tableEnv))
   }
 
@@ -598,7 +597,7 @@ class Table(
     val rowType = getRelNode.getRowType
     val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray
     val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
-      .map(f => 
TypeConverter.sqlTypeToTypeInfo(f.getType.getSqlTypeName)).toArray
+      .map(field => FlinkTypeFactory.toTypeInfo(field.getType)).toArray
 
     // configure the table sink
     val configuredSink = sink.configure(fieldNames, fieldTypes)

http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
index 0c29901..c19deec 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
@@ -18,11 +18,27 @@
 package org.apache.flink.api.table.typeutils
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{BIG_DEC_TYPE_INFO, 
BOOLEAN_TYPE_INFO, STRING_TYPE_INFO}
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, NumericTypeInfo, 
TypeInformation}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, 
NumericTypeInfo, TypeInformation}
 import org.apache.flink.api.table.validate._
 
 object TypeCheckUtils {
 
+  /**
+    * Checks if type information is an advanced type that can be converted to a
+    * SQL type but NOT vice versa.
+    */
+  def isAdvanced(dataType: TypeInformation[_]): Boolean = dataType match {
+    case _: BasicTypeInfo[_] => false
+    case _: SqlTimeTypeInfo[_] => false
+    case _ => true
+  }
+
+  /**
+    * Checks if type information is a simple type that can be converted to a
+    * SQL type and vice versa.
+    */
+  def isSimple(dataType: TypeInformation[_]): Boolean = !isAdvanced(dataType)
+
   def isNumeric(dataType: TypeInformation[_]): Boolean = dataType match {
     case _: NumericTypeInfo[_] => true
     case BIG_DEC_TYPE_INFO => true

http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala
index 95e50d5..ea1f7ce 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala
@@ -21,16 +21,12 @@ package org.apache.flink.api.table.typeutils
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.JoinRelType
 import org.apache.calcite.rel.core.JoinRelType._
-import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, AtomicType, 
TypeInformation}
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.java.typeutils.ValueTypeInfo._
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
-import org.apache.flink.api.table.{Row, TableException}
+import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableException}
 
 import scala.collection.JavaConversions._
 
@@ -38,59 +34,6 @@ object TypeConverter {
 
   val DEFAULT_ROW_TYPE = new RowTypeInfo(Seq(), 
Seq()).asInstanceOf[TypeInformation[Any]]
 
-  def typeInfoToSqlType(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo 
match {
-    case BOOLEAN_TYPE_INFO => BOOLEAN
-    case BYTE_TYPE_INFO => TINYINT
-    case SHORT_TYPE_INFO => SMALLINT
-    case INT_TYPE_INFO => INTEGER
-    case LONG_TYPE_INFO => BIGINT
-    case FLOAT_TYPE_INFO => FLOAT
-    case DOUBLE_TYPE_INFO => DOUBLE
-    case STRING_TYPE_INFO => VARCHAR
-    case BIG_DEC_TYPE_INFO => DECIMAL
-
-    // date/time types
-    case SqlTimeTypeInfo.DATE => DATE
-    case SqlTimeTypeInfo.TIME => TIME
-    case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP
-
-    case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO =>
-      throw new TableException("Character type is not supported.")
-
-    case t@_ =>
-      throw new TableException(s"Type is not supported: $t")
-  }
-
-  def sqlTypeToTypeInfo(sqlType: SqlTypeName): TypeInformation[_] = sqlType 
match {
-    case BOOLEAN => BOOLEAN_TYPE_INFO
-    case TINYINT => BYTE_TYPE_INFO
-    case SMALLINT => SHORT_TYPE_INFO
-    case INTEGER => INT_TYPE_INFO
-    case BIGINT => LONG_TYPE_INFO
-    case FLOAT => FLOAT_TYPE_INFO
-    case DOUBLE => DOUBLE_TYPE_INFO
-    case VARCHAR | CHAR => STRING_TYPE_INFO
-    case DECIMAL => BIG_DEC_TYPE_INFO
-
-    // date/time types
-    case DATE => SqlTimeTypeInfo.DATE
-    case TIME => SqlTimeTypeInfo.TIME
-    case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP
-    case INTERVAL_DAY_TIME | INTERVAL_YEAR_MONTH =>
-      throw new TableException("Intervals are not supported yet.")
-
-    case NULL =>
-      throw new TableException("Type NULL is not supported. " +
-        "Null values must have a supported type.")
-
-    // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
-    // are represented as integer
-    case SYMBOL => INT_TYPE_INFO
-
-    case _ =>
-      throw new TableException("Type " + sqlType.toString + " is not 
supported.")
-  }
-
   /**
     * Determines the return type of Flink operators based on the logical 
fields, the expected
     * physical type and configuration parameters.
@@ -117,7 +60,7 @@ object TypeConverter {
     : TypeInformation[Any] = {
     // convert to type information
     val logicalFieldTypes = logicalRowType.getFieldList map { relDataType =>
-      TypeConverter.sqlTypeToTypeInfo(relDataType.getType.getSqlTypeName)
+      FlinkTypeFactory.toTypeInfo(relDataType.getType)
     }
     // field names
     val logicalFieldNames = logicalRowType.getFieldNames.toList

http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
index ecd916f..af96a04 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.batch.table;
 
+import java.util.HashMap;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -221,6 +222,38 @@ public class FromDataSetITCase extends 
TableProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
+       @Test
+       public void testAsWithPojoAndGenericTypes() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
+
+               List<PojoWithGeneric> data = new ArrayList<>();
+               data.add(new PojoWithGeneric("Peter", 28, new HashMap<String, 
String>(), new ArrayList<String>()));
+               HashMap<String, String> hm1 = new HashMap<>();
+               hm1.put("test1", "test1");
+               data.add(new PojoWithGeneric("Anna", 56, hm1, new 
ArrayList<String>()));
+               HashMap<String, String> hm2 = new HashMap<>();
+               hm2.put("abc", "cde");
+               data.add(new PojoWithGeneric("Lucy", 42, hm2, new 
ArrayList<String>()));
+
+               Table table = tableEnv
+                       .fromDataSet(env.fromCollection(data),
+                               "name AS a, " +
+                               "age AS b, " +
+                               "generic AS c, " +
+                               "generic2 AS d")
+                       .select("a, b, c, c as c2, d")
+                       .select("a, b, c, c === c2, d");
+
+               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+               List<Row> results = ds.collect();
+               String expected =
+                       "Peter,28,{},true,[]\n" +
+                       "Anna,56,{test1=test1},true,[]\n" +
+                       "Lucy,42,{abc=cde},true,[]\n";
+               compareResultAsText(results, expected);
+       }
+
        @Test(expected = TableException.class)
        public void testAsWithToFewFields() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
@@ -287,6 +320,31 @@ public class FromDataSetITCase extends 
TableProgramsTestBase {
        }
 
        @SuppressWarnings("unused")
+       public static class PojoWithGeneric {
+               public String name;
+               public int age;
+               public HashMap<String, String> generic;
+               public ArrayList<String> generic2;
+
+               public PojoWithGeneric() {
+                       // default constructor
+               }
+
+               public PojoWithGeneric(String name, int age, HashMap<String, 
String> generic,
+                               ArrayList<String> generic2) {
+                       this.name = name;
+                       this.age = age;
+                       this.generic = generic;
+                       this.generic2 = generic2;
+               }
+
+               @Override
+               public String toString() {
+                       return name + "," + age + "," + generic + "," + 
generic2;
+               }
+       }
+
+       @SuppressWarnings("unused")
        public static class PrivateSmallPojo {
 
                public PrivateSmallPojo() { }

http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
index 4161b1e..1743981 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/stream/sql/SqlITCase.java
@@ -53,7 +53,7 @@ public class SqlITCase extends 
StreamingMultipleProgramsTestBase {
                resultSet.addSink(new StreamITCase.StringSink());
                env.execute();
 
-               List<String> expected = new ArrayList();
+               List<String> expected = new ArrayList<>();
                expected.add("1,1,Hi");
                expected.add("2,2,Hello");
                expected.add("3,2,Hello world");
@@ -77,7 +77,7 @@ public class SqlITCase extends 
StreamingMultipleProgramsTestBase {
                resultSet.addSink(new StreamITCase.StringSink());
                env.execute();
 
-               List<String> expected = new ArrayList();
+               List<String> expected = new ArrayList<>();
                expected.add("1,1,1");
                expected.add("2,2,2");
                expected.add("2,3,1");
@@ -108,7 +108,7 @@ public class SqlITCase extends 
StreamingMultipleProgramsTestBase {
                resultSet.addSink(new StreamITCase.StringSink());
                env.execute();
 
-               List<String> expected = new ArrayList();
+               List<String> expected = new ArrayList<>();
                expected.add("1,1,Hi");
                expected.add("2,2,Hello");
                expected.add("3,2,Hello world");

http://git-wip-us.apache.org/repos/asf/flink/blob/1b327f1a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
index 4345dd8..32b0e3a 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionTestBase.scala
@@ -21,17 +21,17 @@ package org.apache.flink.api.table.expressions.utils
 import org.apache.calcite.rel.logical.LogicalProject
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.tools.{Frameworks, RelBuilder}
+import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.functions.{Function, MapFunction}
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.{DataSet => JDataSet}
 import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
+import org.apache.flink.api.table._
 import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedFunction}
 import org.apache.flink.api.table.expressions.{Expression, ExpressionParser}
 import org.apache.flink.api.table.runtime.FunctionCompiler
 import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.api.table.{BatchTableEnvironment, Row, TableConfig, 
TableEnvironment}
 import org.junit.Assert._
 import org.junit.{After, Before}
 import org.mockito.Mockito._
@@ -48,7 +48,10 @@ abstract class ExpressionTestBase {
   // setup test utils
   private val tableName = "testTable"
   private val context = prepareContext(typeInfo)
-  private val planner = Frameworks.getPlanner(context._2.getFrameworkConfig)
+  private val planner = new FlinkPlannerImpl(
+    context._2.getFrameworkConfig,
+    context._2.getPlanner,
+    context._2.getTypeFactory)
 
   private def prepareContext(typeInfo: TypeInformation[Any]): (RelBuilder, 
TableEnvironment) = {
     // create DataSetTable
@@ -128,8 +131,6 @@ abstract class ExpressionTestBase {
     // extract RexNode
     val expr: RexNode = 
converted.rel.asInstanceOf[LogicalProject].getChildExps.get(0)
     testExprs.add((expr, expected))
-
-    planner.close()
   }
 
   private def addTableApiTestExpr(tableApiExpr: Expression, expected: String): 
Unit = {

Reply via email to