Repository: flink
Updated Branches:
  refs/heads/master a997dd615 -> 31a2de86d


[FLINK-2961] [table] Add support for basic type Date in Table API

Fix nullCheck enabled

Fix test

TableConfig introduced

Improvements and bug fixing

This closes #1322.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31a2de86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31a2de86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31a2de86

Branch: refs/heads/master
Commit: 31a2de86d9034def9c58ef7f3d3dcae3d4dafd6f
Parents: a997dd6
Author: twalthr <twal...@apache.org>
Authored: Tue Nov 3 15:18:19 2015 +0100
Committer: twalthr <twal...@apache.org>
Committed: Sun Nov 29 16:48:27 2015 +0100

----------------------------------------------------------------------
 docs/libs/table.md                              |   6 +-
 .../apache/flink/api/table/TableConfig.scala    |  66 +++++
 .../table/codegen/ExpressionCodeGenerator.scala | 246 ++++++++++++++++---
 .../api/table/codegen/GenerateFilter.scala      |  27 +-
 .../flink/api/table/codegen/GenerateJoin.scala  |  37 ++-
 .../table/codegen/GenerateResultAssembler.scala |  13 +-
 .../api/table/codegen/GenerateSelect.scala      |  20 +-
 .../flink/api/table/expressions/literals.scala  |   2 +
 .../api/table/parser/ExpressionParser.scala     |   3 +-
 .../runtime/ExpressionFilterFunction.scala      |   9 +-
 .../table/runtime/ExpressionJoinFunction.scala  |   9 +-
 .../runtime/ExpressionSelectFunction.scala      |   9 +-
 .../api/java/table/test/CastingITCase.java      |  23 ++
 .../api/scala/table/test/CastingITCase.scala    |  22 +-
 .../scala/table/test/ExpressionsITCase.scala    |  18 ++
 15 files changed, 440 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/docs/libs/table.md
----------------------------------------------------------------------
diff --git a/docs/libs/table.md b/docs/libs/table.md
index be0b45e..d1d42cc 100644
--- a/docs/libs/table.md
+++ b/docs/libs/table.md
@@ -351,11 +351,11 @@ unary = [ "!" | "-" | "~" ] , suffix ;
 
 suffix = atom | aggregation | cast | as | substring ;
 
-aggregation = atom , [ ".sum" | ".min" | ".max" | ".count" | "avg" ] ;
+aggregation = atom , [ ".sum" | ".min" | ".max" | ".count" | ".avg" ] ;
 
 cast = atom , ".cast(" , data type , ")" ;
 
-data type = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOL" | 
"BOOLEAN" | "STRING" ;
+data type = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOL" | 
"BOOLEAN" | "STRING" | "DATE" ;
 
 as = atom , ".as(" , field reference , ")" ;
 
@@ -372,3 +372,5 @@ atom = ( "(" , single expression , ")" ) | literal | field 
reference ;
 Here, `literal` is a valid Java literal and `field reference` specifies a 
column in the data. The
 column names follow Java identifier syntax.
 
+Only the types `LONG` and `STRING` can be casted to `DATE` and vice versa. A 
`LONG` casted to `DATE` must be a milliseconds timestamp. A `STRING` casted to 
`DATE` must have the format "`yyyy-MM-dd HH:mm:ss.SSS`", "`yyyy-MM-dd`", 
"`HH:mm:ss`", or a milliseconds timestamp. By default, all timestamps refer to 
the UTC timezone beginning from January 1, 1970, 00:00:00 in milliseconds.
+

http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala
new file mode 100644
index 0000000..ffa2bec
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+import java.util.TimeZone
+
+/**
+ * A config to define the runtime behavior of the Table API.
+ */
+class TableConfig extends Serializable {
+
+  /**
+   * Defines the timezone for date/time/timestamp conversions.
+   */
+  private var timeZone: TimeZone = TimeZone.getTimeZone("UTC")
+
+  /**
+   * Defines if all fields need to be checked for NULL first.
+   */
+  private var nullCheck: Boolean = false
+
+  /**
+   * Sets the timezone for date/time/timestamp conversions.
+   */
+  def setTimeZone(timeZone: TimeZone) = {
+    require(timeZone != null, "timeZone must not be null.")
+    this.timeZone = timeZone
+  }
+
+  /**
+   * Returns the timezone for date/time/timestamp conversions.
+   */
+  def getTimeZone = timeZone
+
+  /**
+   * Returns the NULL check. If enabled, all fields need to be checked for 
NULL first.
+   */
+  def getNullCheck = nullCheck
+
+  /**
+   * Sets the NULL check. If enabled, all fields need to be checked for NULL 
first.
+   */
+  def setNullCheck(nullCheck: Boolean) = {
+    this.nullCheck = nullCheck
+  }
+
+}
+
+object TableConfig {
+  val DEFAULT = new TableConfig()
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
index c9220e9..42dec0f 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
@@ -17,11 +17,9 @@
  */
 package org.apache.flink.api.table.codegen
 
+import java.util.Date
 import java.util.concurrent.atomic.AtomicInteger
 
-import org.codehaus.janino.SimpleCompiler
-import org.slf4j.LoggerFactory
-
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
PrimitiveArrayTypeInfo, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
@@ -29,22 +27,26 @@ import org.apache.flink.api.java.typeutils.{PojoTypeInfo, 
TupleTypeInfo}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.table.expressions._
 import org.apache.flink.api.table.typeinfo.{RenamingProxyTypeInfo, RowTypeInfo}
-import org.apache.flink.api.table.{ExpressionException, expressions}
+import org.apache.flink.api.table.{ExpressionException, TableConfig, 
expressions}
+import org.codehaus.janino.SimpleCompiler
+import org.slf4j.LoggerFactory
+
+import scala.collection.mutable
 
 /** Base class for all code generation classes. This provides the 
functionality for generating
   * code from an [[Expression]] tree. Derived classes must embed this in a 
lambda function
   * to form an executable code block.
   *
   * @param inputs List of input variable names with corresponding 
[[TypeInformation]].
-  * @param nullCheck Whether the generated code should include checks for NULL 
values.
   * @param cl The ClassLoader that is used to create the Scala reflection 
ToolBox
+  * @param config General configuration specifying runtime behaviour.
   * @tparam R The type of the generated code block. In most cases a lambda 
function such
   *           as "(IN1, IN2) => OUT".
   */
 abstract class ExpressionCodeGenerator[R](
     inputs: Seq[(String, CompositeType[_])],
-    val nullCheck: Boolean = false,
-    cl: ClassLoader) {
+    cl: ClassLoader,
+    config: TableConfig) {
   protected val log = 
LoggerFactory.getLogger(classOf[ExpressionCodeGenerator[_]])
 
   import scala.reflect.runtime.universe._
@@ -57,6 +59,19 @@ abstract class ExpressionCodeGenerator[R](
   val compiler = new SimpleCompiler()
   compiler.setParentClassLoader(cl)
 
+  protected val reusableMemberStatements = mutable.Set[String]()
+
+  protected val reusableInitStatements = mutable.Set[String]()
+
+  protected def reuseMemberCode(): String = {
+    reusableMemberStatements.mkString("", "\n", "\n")
+  }
+
+  protected def reuseInitCode(): String = {
+    reusableInitStatements.mkString("", "\n", "\n")
+  }
+
+  protected def nullCheck: Boolean = config.getNullCheck
 
   // This is to be implemented by subclasses, we have it like this
   // so that we only call it from here with the Scala Reflection Lock.
@@ -94,9 +109,9 @@ abstract class ExpressionCodeGenerator[R](
             |boolean $nullTerm = ${leftCode.nullTerm} || ${rightCode.nullTerm};
             |$resultTpe $resultTerm;
             |if ($nullTerm) {
-            |  $resultTerm = ${defaultPrimitive(resultType)}
+            |  $resultTerm = ${defaultPrimitive(resultType)};
             |} else {
-            |  $resultTerm = ${expr(leftCode.resultTerm, rightCode.resultTerm)}
+            |  $resultTerm = ${expr(leftCode.resultTerm, 
rightCode.resultTerm)};
             |}
           """.stripMargin
       } else {
@@ -157,7 +172,7 @@ abstract class ExpressionCodeGenerator[R](
       case expressions.Literal(doubleValue: Double, DOUBLE_TYPE_INFO) =>
         if (nullCheck) {
           s"""
-            |val $nullTerm = false
+            |boolean $nullTerm = false;
             |$resultTpe $resultTerm = $doubleValue;
           """.stripMargin
         } else {
@@ -169,7 +184,7 @@ abstract class ExpressionCodeGenerator[R](
       case expressions.Literal(floatValue: Float, FLOAT_TYPE_INFO) =>
         if (nullCheck) {
           s"""
-            |val $nullTerm = false
+            |boolean $nullTerm = false;
             |$resultTpe $resultTerm = ${floatValue}f;
           """.stripMargin
         } else {
@@ -181,7 +196,7 @@ abstract class ExpressionCodeGenerator[R](
       case expressions.Literal(strValue: String, STRING_TYPE_INFO) =>
         if (nullCheck) {
           s"""
-            |val $nullTerm = false
+            |boolean $nullTerm = false;
             |$resultTpe $resultTerm = "$strValue";
           """.stripMargin
         } else {
@@ -193,7 +208,7 @@ abstract class ExpressionCodeGenerator[R](
       case expressions.Literal(boolValue: Boolean, BOOLEAN_TYPE_INFO) =>
         if (nullCheck) {
           s"""
-            |val $nullTerm = false
+            |boolean $nullTerm = false;
             |$resultTpe $resultTerm = $boolValue;
           """.stripMargin
         } else {
@@ -202,6 +217,23 @@ abstract class ExpressionCodeGenerator[R](
           """.stripMargin
         }
 
+      case expressions.Literal(dateValue: Date, DATE_TYPE_INFO) =>
+        val dateName = s"""date_${dateValue.getTime}"""
+        val dateStmt = s"""static java.util.Date $dateName
+             |= new java.util.Date(${dateValue.getTime});""".stripMargin
+        reusableMemberStatements.add(dateStmt)
+
+        if (nullCheck) {
+          s"""
+            |boolean $nullTerm = false;
+            |$resultTpe $resultTerm = $dateName;
+          """.stripMargin
+        } else {
+          s"""
+            |$resultTpe $resultTerm = $dateName;
+          """.stripMargin
+        }
+
       case Substring(str, beginIndex, endIndex) =>
         val strCode = generateExpression(str)
         val beginIndexCode = generateExpression(beginIndex)
@@ -243,6 +275,29 @@ abstract class ExpressionCodeGenerator[R](
             """
         }
 
+      case expressions.Cast(child: Expression, STRING_TYPE_INFO)
+        if child.typeInfo == BasicTypeInfo.DATE_TYPE_INFO =>
+        val childGen = generateExpression(child)
+
+        addTimestampFormatter()
+
+        val castCode = if (nullCheck) {
+          s"""
+            |boolean $nullTerm = ${childGen.nullTerm};
+            |$resultTpe $resultTerm;
+            |if ($nullTerm) {
+            |  $resultTerm = null;
+            |} else {
+            |  $resultTerm = timestampFormatter.format(${childGen.resultTerm});
+            |}
+          """.stripMargin
+        } else {
+          s"""
+            |$resultTpe $resultTerm = 
timestampFormatter.format(${childGen.resultTerm});
+          """.stripMargin
+        }
+        childGen.code + castCode
+
       case expressions.Cast(child: Expression, STRING_TYPE_INFO) =>
         val childGen = generateExpression(child)
         val castCode = if (nullCheck) {
@@ -262,6 +317,104 @@ abstract class ExpressionCodeGenerator[R](
         }
         childGen.code + castCode
 
+      case expressions.Cast(child: Expression, DATE_TYPE_INFO)
+        if child.typeInfo == BasicTypeInfo.LONG_TYPE_INFO =>
+        val childGen = generateExpression(child)
+        val castCode = if (nullCheck) {
+          s"""
+            |boolean $nullTerm = ${childGen.nullTerm};
+            |$resultTpe $resultTerm;
+            |if ($nullTerm) {
+            |  $resultTerm = null;
+            |} else {
+            |  $resultTerm = new java.util.Date(${childGen.resultTerm});
+            |}
+          """.stripMargin
+        } else {
+          s"""
+            |$resultTpe $resultTerm = new 
java.util.Date(${childGen.resultTerm});
+          """.stripMargin
+        }
+        childGen.code + castCode
+
+      case expressions.Cast(child: Expression, DATE_TYPE_INFO)
+        if child.typeInfo == BasicTypeInfo.STRING_TYPE_INFO =>
+        val childGen = generateExpression(child)
+
+        addDateFormatter()
+        addTimeFormatter()
+        addTimestampFormatter()
+
+        // tries to parse
+        // "2011-05-03 15:51:36.234"
+        // then "2011-05-03"
+        // then "15:51:36"
+        // then "1446473775"
+        val parsedName = freshName("parsed")
+        val parsingCode =
+          s"""
+            |java.util.Date $parsedName = null;
+            |try {
+            |  $parsedName = timestampFormatter.parse(${childGen.resultTerm});
+            |} catch (java.text.ParseException e1) {
+            |  try {
+            |    $parsedName = dateFormatter.parse(${childGen.resultTerm});
+            |  } catch (java.text.ParseException e2) {
+            |    try {
+            |      $parsedName = timeFormatter.parse(${childGen.resultTerm});
+            |    } catch (java.text.ParseException e3) {
+            |      $parsedName = new 
java.util.Date(Long.valueOf(${childGen.resultTerm}));
+            |    }
+            |  }
+            |}
+           """.stripMargin
+
+        val castCode = if (nullCheck) {
+          s"""
+            |boolean $nullTerm = ${childGen.nullTerm};
+            |$resultTpe $resultTerm;
+            |if ($nullTerm) {
+            |  $resultTerm = null;
+            |} else {
+            |  $parsingCode
+            |  $resultTerm = $parsedName;
+            |}
+          """.stripMargin
+        } else {
+          s"""
+            |$parsingCode
+            |$resultTpe $resultTerm = $parsedName;
+          """.stripMargin
+        }
+        childGen.code + castCode
+
+      case expressions.Cast(child: Expression, DATE_TYPE_INFO) =>
+        throw new ExpressionException("Only Long and String can be casted to 
Date.")
+
+      case expressions.Cast(child: Expression, LONG_TYPE_INFO)
+        if child.typeInfo == BasicTypeInfo.DATE_TYPE_INFO =>
+        val childGen = generateExpression(child)
+        val castCode = if (nullCheck) {
+          s"""
+            |boolean $nullTerm = ${childGen.nullTerm};
+            |$resultTpe $resultTerm;
+            |if ($nullTerm) {
+            |  $resultTerm = null;
+            |} else {
+            |  $resultTerm = ${childGen.resultTerm}.getTime();
+            |}
+          """.stripMargin
+        } else {
+          s"""
+            |$resultTerm = ${childGen.resultTerm}.getTime();
+          """.stripMargin
+        }
+        childGen.code + castCode
+
+      case expressions.Cast(child: Expression, tpe: BasicTypeInfo[_])
+        if child.typeInfo == BasicTypeInfo.DATE_TYPE_INFO =>
+        throw new ExpressionException("Date can only be casted to Long or 
String.")
+
       case expressions.Cast(child: Expression, tpe: BasicTypeInfo[_])
         if child.typeInfo == BasicTypeInfo.STRING_TYPE_INFO =>
         val childGen = generateExpression(child)
@@ -391,10 +544,11 @@ abstract class ExpressionCodeGenerator[R](
           childCode.code +
             s"""
               |boolean $nullTerm = ${childCode.nullTerm};
+              |$resultTpe $resultTerm;
               |if ($nullTerm) {
-              |  ${defaultPrimitive(child.typeInfo)};
+              |  $resultTerm = ${defaultPrimitive(child.typeInfo)};
               |} else {
-              |  $resultTpe $resultTerm = -(${childCode.resultTerm});
+              |  $resultTerm = -(${childCode.resultTerm});
               |}
             """.stripMargin
         } else {
@@ -425,10 +579,11 @@ abstract class ExpressionCodeGenerator[R](
           childCode.code +
             s"""
               |boolean $nullTerm = ${childCode.nullTerm};
+              |$resultTpe $resultTerm;
               |if ($nullTerm) {
-              |  ${defaultPrimitive(child.typeInfo)};
+              |  $resultTerm = ${defaultPrimitive(child.typeInfo)};
               |} else {
-              |  $resultTpe $resultTerm = ~((int) ${childCode.resultTerm});
+              |  $resultTerm = ~((int) ${childCode.resultTerm});
               |}
             """.stripMargin
         } else {
@@ -444,10 +599,11 @@ abstract class ExpressionCodeGenerator[R](
           childCode.code +
             s"""
               |boolean $nullTerm = ${childCode.nullTerm};
+              |$resultTpe $resultTerm;
               |if ($nullTerm) {
-              |  ${defaultPrimitive(child.typeInfo)};
+              |  $resultTerm = ${defaultPrimitive(child.typeInfo)};
               |} else {
-              |  $resultTpe $resultTerm = !(${childCode.resultTerm});
+              |  $resultTerm = !(${childCode.resultTerm});
               |}
             """.stripMargin
         } else {
@@ -462,12 +618,7 @@ abstract class ExpressionCodeGenerator[R](
         if (nullCheck) {
           childCode.code +
             s"""
-              |boolean $nullTerm = ${childCode.nullTerm};
-              |if ($nullTerm) {
-              |  ${defaultPrimitive(child.typeInfo)};
-              |} else {
-              |  $resultTpe $resultTerm = (${childCode.resultTerm}) == null;
-              |}
+              |$resultTpe $resultTerm = ${childCode.nullTerm};
             """.stripMargin
         } else {
           childCode.code +
@@ -481,12 +632,7 @@ abstract class ExpressionCodeGenerator[R](
         if (nullCheck) {
           childCode.code +
             s"""
-              |boolean $nullTerm = ${childCode.nullTerm};
-              |if ($nullTerm) {
-              |  ${defaultPrimitive(child.typeInfo)};
-              |} else {
-              |  $resultTpe $resultTerm = (${childCode.resultTerm}) != null;
-              |}
+              |$resultTpe $resultTerm = !${childCode.nullTerm};
             """.stripMargin
         } else {
           childCode.code +
@@ -501,10 +647,11 @@ abstract class ExpressionCodeGenerator[R](
           childCode.code +
             s"""
               |boolean $nullTerm = ${childCode.nullTerm};
+              |$resultTpe $resultTerm;
               |if ($nullTerm) {
-              |  ${defaultPrimitive(child.typeInfo)};
+              |  $resultTerm = ${defaultPrimitive(child.typeInfo)};
               |} else {
-              |  $resultTpe $resultTerm = Math.abs(${childCode.resultTerm});
+              |  $resultTerm = Math.abs(${childCode.resultTerm});
               |}
             """.stripMargin
         } else {
@@ -643,4 +790,37 @@ abstract class ExpressionCodeGenerator[R](
       tpe.getTypeClass.getCanonicalName
 
   }
+
+  def addDateFormatter(): Unit = {
+    reusableMemberStatements.add(s"""
+    |java.text.SimpleDateFormat dateFormatter =
+    |  new java.text.SimpleDateFormat("yyyy-MM-dd");
+    |""".stripMargin)
+
+    reusableInitStatements.add(s"""
+    |dateFormatter.setTimeZone(config.getTimeZone());
+    |""".stripMargin)
+  }
+
+  def addTimeFormatter(): Unit = {
+    reusableMemberStatements.add(s"""
+    |java.text.SimpleDateFormat timeFormatter =
+    |  new java.text.SimpleDateFormat("HH:mm:ss");
+    |""".stripMargin)
+
+    reusableInitStatements.add(s"""
+    |timeFormatter.setTimeZone(config.getTimeZone());
+    |""".stripMargin)
+  }
+
+  def addTimestampFormatter(): Unit = {
+    reusableMemberStatements.add(s"""
+    |java.text.SimpleDateFormat timestampFormatter =
+    |  new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+    |""".stripMargin)
+
+    reusableInitStatements.add(s"""
+    |timestampFormatter.setTimeZone(config.getTimeZone());
+    |""".stripMargin)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala
index 06a7076..50b8c69 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala
@@ -19,12 +19,12 @@ package org.apache.flink.api.table.codegen
 
 import java.io.StringReader
 
-import org.slf4j.LoggerFactory
-
 import org.apache.flink.api.common.functions.FilterFunction
 import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.TableConfig
 import org.apache.flink.api.table.codegen.Indenter._
 import org.apache.flink.api.table.expressions.Expression
+import org.slf4j.LoggerFactory
 
 /**
  * Code generator for a unary predicate, i.e. a Filter.
@@ -32,9 +32,11 @@ import org.apache.flink.api.table.expressions.Expression
 class GenerateFilter[T](
     inputType: CompositeType[T],
     predicate: Expression,
-    cl: ClassLoader) extends ExpressionCodeGenerator[FilterFunction[T]](
+    cl: ClassLoader,
+    config: TableConfig) extends ExpressionCodeGenerator[FilterFunction[T]](
       Seq(("in0", inputType)),
-      cl = cl) {
+      cl = cl,
+      config) {
 
   val LOG = LoggerFactory.getLogger(this.getClass)
 
@@ -50,6 +52,13 @@ class GenerateFilter[T](
       j"""
         public class $generatedName
             implements 
org.apache.flink.api.common.functions.FilterFunction<$tpe> {
+
+          org.apache.flink.api.table.TableConfig config = null;
+
+          public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
+            this.config = config;
+          }
+
           public boolean filter(Object _in0) {
             $tpe in0 = ($tpe) _in0;
             ${pred.code}
@@ -65,6 +74,13 @@ class GenerateFilter[T](
       j"""
         public class $generatedName
             implements 
org.apache.flink.api.common.functions.FilterFunction<$tpe> {
+
+          org.apache.flink.api.table.TableConfig config = null;
+
+          public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
+            this.config = config;
+          }
+
           public boolean filter(Object _in0) {
             $tpe in0 = ($tpe) _in0;
             ${pred.code}
@@ -77,6 +93,7 @@ class GenerateFilter[T](
     LOG.debug(s"""Generated unary predicate "$predicate":\n$code""")
     compiler.cook(new StringReader(code))
     val clazz = compiler.getClassLoader().loadClass(generatedName)
-    clazz.newInstance().asInstanceOf[FilterFunction[T]]
+    val constructor = clazz.getConstructor(classOf[TableConfig])
+    constructor.newInstance(config).asInstanceOf[FilterFunction[T]]
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala
index 8c0cec3..b706e6d 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala
@@ -19,12 +19,12 @@ package org.apache.flink.api.table.codegen
 
 import java.io.StringReader
 
-import org.slf4j.LoggerFactory
-
 import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.TableConfig
 import org.apache.flink.api.table.codegen.Indenter._
 import org.apache.flink.api.table.expressions.{Expression, NopExpression}
+import org.slf4j.LoggerFactory
 
 /**
  * Code generator for assembling the result of a binary operation.
@@ -35,10 +35,12 @@ class GenerateJoin[L, R, O](
     resultTypeInfo: CompositeType[O],
     predicate: Expression,
     outputFields: Seq[Expression],
-    cl: ClassLoader)
+    cl: ClassLoader,
+    config: TableConfig)
   extends GenerateResultAssembler[FlatJoinFunction[L, R, O]](
     Seq(("in0", leftTypeInfo), ("in1", rightTypeInfo)),
-    cl = cl) {
+    cl = cl,
+    config) {
 
   val LOG = LoggerFactory.getLogger(this.getClass)
 
@@ -66,6 +68,11 @@ class GenerateJoin[L, R, O](
 
           ${reuseCode(resultTypeInfo)}
 
+          public org.apache.flink.api.table.TableConfig config = null;
+          public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
+            this.config = config;
+          }
+
           public void join(Object _in0, Object _in1, 
org.apache.flink.util.Collector coll) {
             $leftTpe in0 = ($leftTpe) _in0;
             $rightTpe in1 = ($rightTpe) _in1;
@@ -81,6 +88,11 @@ class GenerateJoin[L, R, O](
 
           ${reuseCode(resultTypeInfo)}
 
+          public org.apache.flink.api.table.TableConfig config = null;
+          public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
+            this.config = config;
+          }
+
           public void join(Object _in0, Object _in1, 
org.apache.flink.util.Collector coll) {
             $leftTpe in0 = ($leftTpe) _in0;
             $rightTpe in1 = ($rightTpe) _in1;
@@ -102,6 +114,13 @@ class GenerateJoin[L, R, O](
 
           ${reuseCode(resultTypeInfo)}
 
+          org.apache.flink.api.table.TableConfig config = null;
+
+          public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
+            this.config = config;
+            ${reuseInitCode()}
+          }
+
           public void join(Object _in0, Object _in1, 
org.apache.flink.util.Collector coll) {
             $leftTpe in0 = ($leftTpe) _in0;
             $rightTpe in1 = ($rightTpe) _in1;
@@ -121,6 +140,13 @@ class GenerateJoin[L, R, O](
 
           ${reuseCode(resultTypeInfo)}
 
+          org.apache.flink.api.table.TableConfig config = null;
+
+          public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
+            this.config = config;
+            ${reuseInitCode()}
+          }
+
           public void join(Object _in0, Object _in1, 
org.apache.flink.util.Collector coll) {
             $leftTpe in0 = ($leftTpe) _in0;
             $rightTpe in1 = ($rightTpe) _in1;
@@ -139,6 +165,7 @@ class GenerateJoin[L, R, O](
     LOG.debug(s"""Generated join:\n$code""")
     compiler.cook(new StringReader(code))
     val clazz = compiler.getClassLoader().loadClass(generatedName)
-    clazz.newInstance().asInstanceOf[FlatJoinFunction[L, R, O]]
+    val constructor = clazz.getConstructor(classOf[TableConfig])
+    constructor.newInstance(config).asInstanceOf[FlatJoinFunction[L, R, O]]
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala
index 5172eab..3916410 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala
@@ -20,6 +20,7 @@ package org.apache.flink.api.table.codegen
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.table.TableConfig
 import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.table.typeinfo.RowTypeInfo
 
@@ -28,16 +29,20 @@ import org.apache.flink.api.table.typeinfo.RowTypeInfo
  */
 abstract class GenerateResultAssembler[R](
     inputs: Seq[(String, CompositeType[_])],
-    cl: ClassLoader)
-  extends ExpressionCodeGenerator[R](inputs, cl = cl) {
+    cl: ClassLoader,
+    config: TableConfig)
+  extends ExpressionCodeGenerator[R](inputs, cl = cl, config) {
 
   def reuseCode[A](resultTypeInfo: CompositeType[A]) = {
       val resultTpe = typeTermForTypeInfo(resultTypeInfo)
       resultTypeInfo match {
-        case pj: PojoTypeInfo[_] => s"$resultTpe out = new 
${pj.getTypeClass.getCanonicalName}();"
+        case pj: PojoTypeInfo[_] =>
+          super.reuseMemberCode() +
+            s"$resultTpe out = new ${pj.getTypeClass.getCanonicalName}();"
 
         case row: RowTypeInfo =>
-          s"org.apache.flink.api.table.Row out =" +
+          super.reuseMemberCode() +
+            s"org.apache.flink.api.table.Row out =" +
             s" new org.apache.flink.api.table.Row(${row.getArity});"
 
         case _ => ""

http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala
index 5941662..a75d15b 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala
@@ -19,12 +19,12 @@ package org.apache.flink.api.table.codegen
 
 import java.io.StringReader
 
-import org.slf4j.LoggerFactory
-
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.TableConfig
 import org.apache.flink.api.table.codegen.Indenter._
 import org.apache.flink.api.table.expressions.Expression
+import org.slf4j.LoggerFactory
 
 /**
  * Code generator for assembling the result of a select operation.
@@ -33,10 +33,12 @@ class GenerateSelect[I, O](
     inputTypeInfo: CompositeType[I],
     resultTypeInfo: CompositeType[O],
     outputFields: Seq[Expression],
-    cl: ClassLoader)
+    cl: ClassLoader,
+    config: TableConfig)
   extends GenerateResultAssembler[MapFunction[I, O]](
     Seq(("in0", inputTypeInfo)),
-    cl = cl) {
+    cl = cl,
+    config) {
 
   val LOG = LoggerFactory.getLogger(this.getClass)
 
@@ -58,6 +60,13 @@ class GenerateSelect[I, O](
 
           ${reuseCode(resultTypeInfo)}
 
+          org.apache.flink.api.table.TableConfig config = null;
+
+          public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
+            this.config = config;
+            ${reuseInitCode()}
+          }
+
           @Override
           public Object map(Object _in0) {
             $inputTpe in0 = ($inputTpe) _in0;
@@ -69,6 +78,7 @@ class GenerateSelect[I, O](
     LOG.debug(s"""Generated select:\n$code""")
     compiler.cook(new StringReader(code))
     val clazz = compiler.getClassLoader().loadClass(generatedName)
-    clazz.newInstance().asInstanceOf[MapFunction[I, O]]
+    val constructor = clazz.getConstructor(classOf[TableConfig])
+    constructor.newInstance(config).asInstanceOf[MapFunction[I, O]]
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
index 5654649..f909cab 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.table.expressions
 
+import java.util.Date
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.scala.table.ImplicitExpressionOperations
 
@@ -28,6 +29,7 @@ object Literal {
     case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO)
     case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO)
     case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO)
+    case date: Date => Literal(date, BasicTypeInfo.DATE_TYPE_INFO)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
index 075f070..2cbd8fa 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
@@ -117,7 +117,8 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
     atom <~ ".cast(DOUBLE)" ^^ { e => Cast(e, BasicTypeInfo.DOUBLE_TYPE_INFO) 
} |
     atom <~ ".cast(BOOL)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } 
|
     atom <~ ".cast(BOOLEAN)" ^^ { e => Cast(e, 
BasicTypeInfo.BOOLEAN_TYPE_INFO) } |
-    atom <~ ".cast(STRING)" ^^ { e => Cast(e, BasicTypeInfo.STRING_TYPE_INFO) }
+    atom <~ ".cast(STRING)" ^^ { e => Cast(e, BasicTypeInfo.STRING_TYPE_INFO) 
} |
+    atom <~ ".cast(DATE)" ^^ { e => Cast(e, BasicTypeInfo.DATE_TYPE_INFO) }
 
   lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ 
")" ^^ {
     case e ~ _ ~ as ~ _ => Naming(e, as.name)

http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
index f1ba847..4e50272 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
@@ -19,6 +19,7 @@ package org.apache.flink.api.table.runtime
 
 import org.apache.flink.api.common.functions.{FilterFunction, 
RichFilterFunction}
 import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.TableConfig
 import org.apache.flink.api.table.codegen.GenerateFilter
 import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.configuration.Configuration
@@ -29,16 +30,18 @@ import org.apache.flink.configuration.Configuration
  */
 class ExpressionFilterFunction[T](
     predicate: Expression,
-    inputType: CompositeType[T]) extends RichFilterFunction[T] {
+    inputType: CompositeType[T],
+    config: TableConfig = TableConfig.DEFAULT) extends RichFilterFunction[T] {
 
   var compiledFilter: FilterFunction[T] = null
 
-  override def open(config: Configuration): Unit = {
+  override def open(c: Configuration): Unit = {
     if (compiledFilter == null) {
       val codegen = new GenerateFilter[T](
         inputType,
         predicate,
-        getRuntimeContext.getUserCodeClassLoader)
+        getRuntimeContext.getUserCodeClassLoader,
+        config)
       compiledFilter = codegen.generate()
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
index 5743211..cf2c90f 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
@@ -19,6 +19,7 @@ package org.apache.flink.api.table.runtime
 
 import org.apache.flink.api.common.functions.{FlatJoinFunction, 
RichFlatJoinFunction}
 import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.TableConfig
 import org.apache.flink.api.table.codegen.GenerateJoin
 import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.configuration.Configuration
@@ -33,18 +34,20 @@ class ExpressionJoinFunction[L, R, O](
     leftType: CompositeType[L],
     rightType: CompositeType[R],
     resultType: CompositeType[O],
-    outputFields: Seq[Expression]) extends RichFlatJoinFunction[L, R, O] {
+    outputFields: Seq[Expression],
+    config: TableConfig = TableConfig.DEFAULT) extends RichFlatJoinFunction[L, 
R, O] {
 
   var compiledJoin: FlatJoinFunction[L, R, O] = null
 
-  override def open(config: Configuration): Unit = {
+  override def open(c: Configuration): Unit = {
     val codegen = new GenerateJoin[L, R, O](
       leftType,
       rightType,
       resultType,
       predicate,
       outputFields,
-      getRuntimeContext.getUserCodeClassLoader)
+      getRuntimeContext.getUserCodeClassLoader,
+      config)
     compiledJoin = codegen.generate()
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
index 32098c3..ab7adb1 100644
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.api.table.runtime
 
+import org.apache.flink.api.table.TableConfig
 import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
 import org.apache.flink.api.common.typeutils.CompositeType
@@ -30,18 +31,20 @@ import org.apache.flink.configuration.Configuration
 class ExpressionSelectFunction[I, O](
      inputType: CompositeType[I],
      resultType: CompositeType[O],
-     outputFields: Seq[Expression]) extends RichMapFunction[I, O] {
+     outputFields: Seq[Expression],
+     config: TableConfig = TableConfig.DEFAULT) extends RichMapFunction[I, O] {
 
   var compiledSelect: MapFunction[I, O] = null
 
-  override def open(config: Configuration): Unit = {
+  override def open(c: Configuration): Unit = {
 
     if (compiledSelect == null) {
       val resultCodegen = new GenerateSelect[I, O](
         inputType,
         resultType,
         outputFields,
-        getRuntimeContext.getUserCodeClassLoader)
+        getRuntimeContext.getUserCodeClassLoader,
+        config)
 
       compiledSelect = resultCodegen.generate()
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
 
b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
index aa4ef2a..6a83d17 100644
--- 
a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
+++ 
b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.table.test;
 
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.Row;
 import org.apache.flink.api.java.DataSet;
@@ -122,5 +123,27 @@ public class CastingITCase extends 
MultipleProgramsTestBase {
                String expected = "1,1,1,1,2.0,2.0,true\n";
                compareResultAsText(results, expected);
        }
+
+       @Test
+       public void testCastDateFromString() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSource<Tuple4<String, String, String, String>> input =
+                               env.fromElements(new Tuple4<>("2011-05-03", 
"15:51:36", "2011-05-03 15:51:36.000", "1446473775"));
+
+               Table table =
+                               tableEnv.fromDataSet(input);
+
+               Table result = table
+                               .select("f0.cast(DATE) AS f0, f1.cast(DATE) AS 
f1, f2.cast(DATE) AS f2, f3.cast(DATE) AS f3")
+                               .select("f0.cast(STRING), f1.cast(STRING), 
f2.cast(STRING), f3.cast(STRING)");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "2011-05-03 00:00:00.000,1970-01-01 
15:51:36.000,2011-05-03 15:51:36.000," +
+                               "1970-01-17 17:47:53.775\n";
+               compareResultAsText(results, expected);
+       }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
index 9c58b64..77fd7b7 100644
--- 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
+++ 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.api.scala.table.test
 
+import java.util.Date
+
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -38,10 +40,10 @@ class CastingITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mo
   def testAutoCastToString(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toTable
-      .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d")
+    val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, new 
Date(0))).toTable
+      .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + 
"d", '_7 + "Date")
       .toDataSet[Row]
-    val expected = "1b,1s,1i,1L,1.0f,1.0d"
+    val expected = "1b,1s,1i,1L,1.0f,1.0d,1970-01-01 00:00:00.000Date"
     val results = ds.collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
@@ -80,7 +82,9 @@ class CastingITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mo
   def testCastFromString: Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(("1", "true", "2.0")).toTable
+    val ds = env.fromElements(("1", "true", "2.0",
+        "2011-05-03", "15:51:36", "2011-05-03 15:51:36.000", "1446473775"))
+      .toTable
       .select(
         '_1.cast(BasicTypeInfo.BYTE_TYPE_INFO),
         '_1.cast(BasicTypeInfo.SHORT_TYPE_INFO),
@@ -88,9 +92,15 @@ class CastingITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mo
         '_1.cast(BasicTypeInfo.LONG_TYPE_INFO),
         '_3.cast(BasicTypeInfo.DOUBLE_TYPE_INFO),
         '_3.cast(BasicTypeInfo.FLOAT_TYPE_INFO),
-        '_2.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO))
+        '_2.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO),
+        
'_4.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO),
+        
'_5.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO),
+        
'_6.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO),
+        
'_7.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO))
     .toDataSet[Row]
-    val expected = "1,1,1,1,2.0,2.0,true\n"
+    val expected = "1,1,1,1,2.0,2.0,true," +
+      "2011-05-03 00:00:00.000,1970-01-01 15:51:36.000,2011-05-03 
15:51:36.000," +
+      "1970-01-17 17:47:53.775\n"
     val results = ds.collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
index f5d7a4d..017cbf1 100644
--- 
a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ 
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -18,6 +18,10 @@
 
 package org.apache.flink.api.scala.table.test
 
+import java.util.Date
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.table.expressions.Literal
 import org.apache.flink.api.table.{Row, ExpressionException}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
@@ -113,4 +117,18 @@ class ExpressionsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBas
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testDateLiteral(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val ds = env.fromElements((0L, "test")).as('a, 'b)
+      .select('a,
+        Literal(new Date(0)).cast(BasicTypeInfo.STRING_TYPE_INFO),
+        
'a.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO))
+      .toDataSet[Row]
+    val expected = "0,1970-01-01 00:00:00.000,1970-01-01 00:00:00.000"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
 }

Reply via email to