Repository: flink
Updated Branches:
  refs/heads/master f0992aa13 -> f170e04ab


[FLINK-4359] [table] Add INTERVAL type

This closes #2348.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f170e04a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f170e04a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f170e04a

Branch: refs/heads/master
Commit: f170e04ab93fba75c517b0f9184587d7b306113e
Parents: f0992aa
Author: twalthr <twal...@apache.org>
Authored: Mon Aug 8 10:15:49 2016 +0200
Committer: twalthr <twal...@apache.org>
Committed: Thu Aug 11 14:39:24 2016 +0200

----------------------------------------------------------------------
 docs/apis/table.md                              |  42 +-
 .../flink/api/scala/table/expressionDsl.scala   |  59 ++-
 .../flink/api/table/FlinkTypeFactory.scala      |  39 +-
 .../org/apache/flink/api/table/Types.scala      |   3 +
 .../flink/api/table/codegen/CodeGenUtils.scala  |  46 ++-
 .../flink/api/table/codegen/CodeGenerator.scala |  50 ++-
 .../table/codegen/calls/ScalarOperators.scala   | 113 +++++-
 .../table/expressions/ExpressionParser.scala    |  62 ++-
 .../api/table/expressions/ExpressionUtils.scala |  40 ++
 .../api/table/expressions/arithmetic.scala      |  48 ++-
 .../flink/api/table/expressions/cast.scala      |   3 +-
 .../flink/api/table/expressions/literals.scala  |  20 +
 .../api/table/typeutils/IntervalTypeInfo.scala  | 109 ++++++
 .../api/table/typeutils/TypeCheckUtils.scala    |  10 +-
 .../api/table/typeutils/TypeCoercion.scala      |   8 +
 .../api/java/batch/table/ExpressionsITCase.java |   5 +-
 .../scala/expression/TemporalTypesTest.scala    | 389 +++++++++++++++++++
 .../api/scala/expression/TimeTypesTest.scala    | 231 -----------
 18 files changed, 947 insertions(+), 330 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/docs/apis/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/table.md b/docs/apis/table.md
index 57252d9..ea8e343 100644
--- a/docs/apis/table.md
+++ b/docs/apis/table.md
@@ -957,11 +957,13 @@ unary = [ "!" | "-" ] , composite ;
 
 composite = suffixed | atom ;
 
-suffixed = cast | as | aggregation | nullCheck | if | functionCall ;
+suffixed = interval | cast | as | aggregation | nullCheck | if | functionCall ;
+
+interval = composite , "." , ("year" | "month" | "day" | "hour" | "minute" | 
"second" | "milli") ;
 
 cast = composite , ".cast(" , dataType , ")" ;
 
-dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" 
| "STRING" | "DECIMAL" | "DATE" | "TIME" | "TIMESTAMP";
+dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" 
| "STRING" | "DECIMAL" | "DATE" | "TIME" | "TIMESTAMP" | "INTERVAL_MONTHS" | 
"INTERVAL_MILLIS" ;
 
 as = composite , ".as(" , fieldReference , ")" ;
 
@@ -986,6 +988,8 @@ If working with exact numeric values or large decimals is 
required, the Table AP
 
 In order to work with temporal values the Table API supports Java SQL's Date, 
Time, and Timestamp types. In the Scala Table API literals can be defined by 
using `java.sql.Date.valueOf("2016-06-27")`, 
`java.sql.Time.valueOf("10:10:42")`, or `java.sql.Timestamp.valueOf("2016-06-27 
10:10:42.123")`. The Java and Scala Table API also support calling 
`"2016-06-27".toDate()`, `"10:10:42".toTime()`, and `"2016-06-27 
10:10:42.123".toTimestamp()` for converting Strings into temporal types. 
*Note:* Since Java's temporal SQL types are time zone dependent, please make 
sure that the Flink Client and all TaskManagers use the same time zone.
 
+Temporal intervals can be represented as number of months 
(`Types.INTERVAL_MONTHS`) or number of milliseconds (`Types.INTERVAL_MILLIS`). 
Intervals of same type can be added or subtracted (e.g. `2.hour + 10.minutes`). 
Intervals of milliseconds can be added to time points (e.g. 
`"2016-08-10".toDate + 5.day`).
+
 {% top %}
 
 
@@ -1037,8 +1041,8 @@ The current version supports selection (filter), 
projection, inner equi-joins, g
 
 Among others, the following SQL features are not supported, yet:
 
-- Time interval data type (`INTERVAL`)
-- Timestamps are limited to milliseconds precision
+- Timestamps and intervals are limited to milliseconds precision
+- Interval arithmetic is currenly limited
 - Distinct aggregates (e.g., `COUNT(DISTINCT name)`)
 - Non-equi joins and Cartesian products
 - Grouping sets
@@ -1171,20 +1175,22 @@ Data Types
 
 The Table API is built on top of Flink's DataSet and DataStream API. 
Internally, it also uses Flink's `TypeInformation` to distinguish between 
types. The Table API does not support all Flink types so far. All supported 
simple types are listed in `org.apache.flink.api.table.Types`. The following 
table summarizes the relation between Table API types, SQL types, and the 
resulting Java class.
 
-| Table API              | SQL             | Java type              |
-| :--------------------- | :-------------- | :--------------------- |
-| `Types.STRING`         | `VARCHAR`       | `java.lang.String`     |
-| `Types.BOOLEAN`        | `BOOLEAN`       | `java.lang.Boolean`    |
-| `Types.BYTE`           | `TINYINT`       | `java.lang.Byte`       |
-| `Types.SHORT`          | `SMALLINT`      | `java.lang.Short`      |
-| `Types.INT`            | `INTEGER, INT`  | `java.lang.Integer`    |
-| `Types.LONG`           | `BIGINT`        | `java.lang.Long`       |
-| `Types.FLOAT`          | `REAL, FLOAT`   | `java.lang.Float`      |
-| `Types.DOUBLE`         | `DOUBLE`        | `java.lang.Double`     |
-| `Types.DECIMAL`        | `DECIMAL`       | `java.math.BigDecimal` |
-| `Types.DATE`           | `DATE`          | `java.sql.Date`        |
-| `Types.TIME`           | `TIME`          | `java.sql.Time`        |
-| `Types.TIMESTAMP`      | `TIMESTAMP`     | `java.sql.Timestamp`   |
+| Table API              | SQL                         | Java type             
 |
+| :--------------------- | :-------------------------- | 
:--------------------- |
+| `Types.STRING`         | `VARCHAR`                   | `java.lang.String`    
 |
+| `Types.BOOLEAN`        | `BOOLEAN`                   | `java.lang.Boolean`   
 |
+| `Types.BYTE`           | `TINYINT`                   | `java.lang.Byte`      
 |
+| `Types.SHORT`          | `SMALLINT`                  | `java.lang.Short`     
 |
+| `Types.INT`            | `INTEGER, INT`              | `java.lang.Integer`   
 |
+| `Types.LONG`           | `BIGINT`                    | `java.lang.Long`      
 |
+| `Types.FLOAT`          | `REAL, FLOAT`               | `java.lang.Float`     
 |
+| `Types.DOUBLE`         | `DOUBLE`                    | `java.lang.Double`    
 |
+| `Types.DECIMAL`        | `DECIMAL`                   | 
`java.math.BigDecimal` |
+| `Types.DATE`           | `DATE`                      | `java.sql.Date`       
 |
+| `Types.TIME`           | `TIME`                      | `java.sql.Time`       
 |
+| `Types.TIMESTAMP`      | `TIMESTAMP(3)`              | `java.sql.Timestamp`  
 |
+| `Types.INTERVAL_MONTHS`| `INTERVAL YEAR TO MONTH`    | `java.lang.Integer`   
 |
+| `Types.INTERVAL_MILLIS`| `INTERVAL DAY TO SECOND(3)` | `java.lang.Long`      
 |
 
 Advanced types such as generic types, composite types (e.g. POJOs or Tuples), 
and arrays can be fields of a row but can not be accessed yet. They are treated 
like a black box within Table API and SQL.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index cb91066..9ca9c8a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -17,13 +17,15 @@
  */
 package org.apache.flink.api.scala.table
 
-import java.sql.{Timestamp, Time, Date}
-
-import scala.language.implicitConversions
+import java.sql.{Date, Time, Timestamp}
 
+import org.apache.calcite.avatica.util.DateTimeUtils._
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import 
org.apache.flink.api.table.expressions.ExpressionUtils.{toMilliInterval, 
toMonthInterval}
 import org.apache.flink.api.table.expressions._
 
+import scala.language.implicitConversions
+
 /**
  * These are all the operations that can be used to construct an 
[[Expression]] AST for expression
  * operations.
@@ -227,6 +229,57 @@ trait ImplicitExpressionOperations {
     * Parses a timestamp String in the form "yy-mm-dd hh:mm:ss.fff" to a SQL 
Timestamp.
     */
   def toTimestamp = Cast(expr, SqlTimeTypeInfo.TIMESTAMP)
+
+  // interval types
+
+  /**
+    * Creates an interval of the given number of years.
+    *
+    * @return interval of months
+    */
+  def year = toMonthInterval(expr, 12)
+
+  /**
+    * Creates an interval of the given number of months.
+    *
+    * @return interval of months
+    */
+  def month = toMonthInterval(expr, 1)
+
+  /**
+    * Creates an interval of the given number of days.
+    *
+    * @return interval of milliseconds
+    */
+  def day = toMilliInterval(expr, MILLIS_PER_DAY)
+
+    /**
+    * Creates an interval of the given number of hours.
+    *
+    * @return interval of milliseconds
+    */
+  def hour = toMilliInterval(expr, MILLIS_PER_HOUR)
+
+    /**
+    * Creates an interval of the given number of minutes.
+    *
+    * @return interval of milliseconds
+    */
+  def minute = toMilliInterval(expr, MILLIS_PER_MINUTE)
+
+    /**
+    * Creates an interval of the given number of seconds.
+    *
+    * @return interval of milliseconds
+    */
+  def second = toMilliInterval(expr, MILLIS_PER_SECOND)
+
+    /**
+    * Creates an interval of the given number of milliseconds.
+    *
+    * @return interval of milliseconds
+    */
+  def milli = toMilliInterval(expr, 1)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
index 6a31487..5a116db 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala
@@ -18,15 +18,19 @@
 
 package org.apache.flink.api.table
 
+import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem}
+import org.apache.calcite.sql.SqlIntervalQualifier
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.ValueTypeInfo._
 import org.apache.flink.api.table.FlinkTypeFactory.typeInfoToSqlTypeName
 import org.apache.flink.api.table.plan.schema.GenericRelDataType
+import org.apache.flink.api.table.typeutils.IntervalTypeInfo
 import org.apache.flink.api.table.typeutils.TypeCheckUtils.isSimple
 
 import scala.collection.mutable
@@ -42,7 +46,20 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
   def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType = {
     // simple type can be converted to SQL types and vice versa
     if (isSimple(typeInfo)) {
-      createSqlType(typeInfoToSqlTypeName(typeInfo))
+      val sqlType = typeInfoToSqlTypeName(typeInfo)
+      sqlType match {
+
+        case INTERVAL_YEAR_MONTH =>
+          createSqlIntervalType(
+            new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, 
SqlParserPos.ZERO))
+
+        case INTERVAL_DAY_TIME =>
+          createSqlIntervalType(
+            new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, 
SqlParserPos.ZERO))
+
+        case _ =>
+          createSqlType(sqlType)
+      }
     }
     // advanced types require specific RelDataType
     // for storing the original TypeInformation
@@ -58,7 +75,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends 
JavaTypeFactoryImp
       new GenericRelDataType(typeInfo, 
getTypeSystem.asInstanceOf[FlinkTypeSystem])
 
     case ti@_ =>
-      throw new TableException(s"Unsupported type information: $ti")
+      throw TableException(s"Unsupported type information: $ti")
   }
 }
 
@@ -75,16 +92,18 @@ object FlinkTypeFactory {
       case STRING_TYPE_INFO => VARCHAR
       case BIG_DEC_TYPE_INFO => DECIMAL
 
-      // date/time types
+      // temporal types
       case SqlTimeTypeInfo.DATE => DATE
       case SqlTimeTypeInfo.TIME => TIME
       case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP
+      case IntervalTypeInfo.INTERVAL_MONTHS => INTERVAL_YEAR_MONTH
+      case IntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_TIME
 
       case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO =>
-        throw new TableException("Character type is not supported.")
+        throw TableException("Character type is not supported.")
 
       case _@t =>
-        throw new TableException(s"Type is not supported: $t")
+        throw TableException(s"Type is not supported: $t")
   }
 
   def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = 
relDataType.getSqlTypeName match {
@@ -98,15 +117,15 @@ object FlinkTypeFactory {
     case VARCHAR | CHAR => STRING_TYPE_INFO
     case DECIMAL => BIG_DEC_TYPE_INFO
 
-    // date/time types
+    // temporal types
     case DATE => SqlTimeTypeInfo.DATE
     case TIME => SqlTimeTypeInfo.TIME
     case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP
-    case INTERVAL_DAY_TIME | INTERVAL_YEAR_MONTH =>
-      throw new TableException("Intervals are not supported yet.")
+    case INTERVAL_YEAR_MONTH => IntervalTypeInfo.INTERVAL_MONTHS
+    case INTERVAL_DAY_TIME => IntervalTypeInfo.INTERVAL_MILLIS
 
     case NULL =>
-      throw new TableException("Type NULL is not supported. " +
+      throw TableException("Type NULL is not supported. " +
         "Null values must have a supported type.")
 
     // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
@@ -119,6 +138,6 @@ object FlinkTypeFactory {
       genericRelDataType.typeInfo
 
     case _@t =>
-      throw new TableException(s"Type is not supported: $t")
+      throw TableException(s"Type is not supported: $t")
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala
index d63683e..64d4612 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.api.table
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
+import org.apache.flink.api.table.typeutils.IntervalTypeInfo
 
 /**
   * This class enumerates all supported types of the Table API.
@@ -38,5 +39,7 @@ object Types {
   val DATE = SqlTimeTypeInfo.DATE
   val TIME = SqlTimeTypeInfo.TIME
   val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP
+  val INTERVAL_MONTHS = IntervalTypeInfo.INTERVAL_MONTHS
+  val INTERVAL_MILLIS = IntervalTypeInfo.INTERVAL_MILLIS
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
index c0f4fc8..18bf49f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
@@ -28,7 +28,7 @@ import 
org.apache.flink.api.common.typeinfo.{FractionalTypeInfo, SqlTimeTypeInfo
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo, 
TypeExtractor}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeCheckUtils}
+import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, RowTypeInfo, 
TypeCheckUtils}
 
 object CodeGenUtils {
 
@@ -65,11 +65,15 @@ object CodeGenUtils {
     case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
     case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
 
-    // internal primitive representation of Date/Time/Timestamp
+    // internal primitive representation of time points
     case SqlTimeTypeInfo.DATE => "int"
     case SqlTimeTypeInfo.TIME => "int"
     case SqlTimeTypeInfo.TIMESTAMP => "long"
 
+    // internal primitive representation of time intervals
+    case IntervalTypeInfo.INTERVAL_MONTHS => "int"
+    case IntervalTypeInfo.INTERVAL_MILLIS => "long"
+
     case _ =>
       tpe.getTypeClass.getCanonicalName
   }
@@ -100,7 +104,10 @@ object CodeGenUtils {
     case BOOLEAN_TYPE_INFO => "false"
     case STRING_TYPE_INFO => "\"\""
     case CHAR_TYPE_INFO => "'\\0'"
-    case SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIME | 
SqlTimeTypeInfo.TIMESTAMP => "-1"
+    case SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIME => "-1"
+    case SqlTimeTypeInfo.TIMESTAMP => "-1L"
+    case IntervalTypeInfo.INTERVAL_MONTHS => "-1"
+    case IntervalTypeInfo.INTERVAL_MILLIS => "-1L"
 
     case _ => "null"
   }
@@ -113,7 +120,10 @@ object CodeGenUtils {
   def qualifyMethod(method: Method): String =
     method.getDeclaringClass.getCanonicalName + "." + method.getName
 
-  def internalToTemporalCode(resultType: TypeInformation[_], resultTerm: 
String) =
+  def qualifyEnum(enum: Enum[_]): String =
+    enum.getClass.getCanonicalName + "." + enum.name()
+
+  def internalToTimePointCode(resultType: TypeInformation[_], resultTerm: 
String) =
     resultType match {
       case SqlTimeTypeInfo.DATE =>
         s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_DATE.method)}($resultTerm)"
@@ -123,7 +133,7 @@ object CodeGenUtils {
         
s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_TIMESTAMP.method)}($resultTerm)"
     }
 
-  def temporalToInternalCode(resultType: TypeInformation[_], resultTerm: 
String) =
+  def timePointToInternalCode(resultType: TypeInformation[_], resultTerm: 
String) =
     resultType match {
       case SqlTimeTypeInfo.DATE =>
         s"${qualifyMethod(BuiltInMethod.DATE_TO_INT.method)}($resultTerm)"
@@ -146,15 +156,25 @@ object CodeGenUtils {
       throw new CodeGenException(s"Comparable type expected, but was 
'${genExpr.resultType}'.")
     }
 
-  def requireString(genExpr: GeneratedExpression) = genExpr.resultType match {
-    case STRING_TYPE_INFO => // ok
-    case _ => throw new CodeGenException("String expression type expected.")
-  }
+  def requireString(genExpr: GeneratedExpression) =
+    if (!TypeCheckUtils.isString(genExpr.resultType)) {
+      throw new CodeGenException("String expression type expected.")
+    }
 
-  def requireBoolean(genExpr: GeneratedExpression) = genExpr.resultType match {
-    case BOOLEAN_TYPE_INFO => // ok
-    case _ => throw new CodeGenException("Boolean expression type expected.")
-  }
+  def requireBoolean(genExpr: GeneratedExpression) =
+    if (!TypeCheckUtils.isBoolean(genExpr.resultType)) {
+      throw new CodeGenException("Boolean expression type expected.")
+    }
+
+  def requireTemporal(genExpr: GeneratedExpression) =
+    if (!TypeCheckUtils.isTemporal(genExpr.resultType)) {
+      throw new CodeGenException("Temporal expression type expected.")
+    }
+
+  def requireTimeInterval(genExpr: GeneratedExpression) =
+    if (!TypeCheckUtils.isTimeInterval(genExpr.resultType)) {
+      throw new CodeGenException("Interval expression type expected.")
+    }
 
   // 
----------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index c1508d8..92f9761 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -29,13 +29,13 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, 
SqlTimeTypeInfo, TypeIn
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
 import org.apache.flink.api.table.codegen.CodeGenUtils._
 import org.apache.flink.api.table.codegen.Indenter.toISC
 import org.apache.flink.api.table.codegen.calls.ScalarFunctions
 import org.apache.flink.api.table.codegen.calls.ScalarOperators._
 import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.api.table.typeutils.TypeCheckUtils.{isNumeric, 
isString, isTemporal}
+import org.apache.flink.api.table.typeutils.TypeCheckUtils._
+import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
@@ -618,6 +618,20 @@ class CodeGenerator(
         generateNonNullLiteral(resultType, value.toString)
       case TIMESTAMP =>
         generateNonNullLiteral(resultType, value.toString + "L")
+      case INTERVAL_YEAR_MONTH =>
+        val decimal = BigDecimal(value.asInstanceOf[JBigDecimal])
+        if (decimal.isValidInt) {
+          generateNonNullLiteral(resultType, decimal.intValue().toString)
+        } else {
+          throw new CodeGenException("Decimal can not be converted to interval 
of months.")
+        }
+      case INTERVAL_DAY_TIME =>
+        val decimal = BigDecimal(value.asInstanceOf[JBigDecimal])
+        if (decimal.isValidLong) {
+          generateNonNullLiteral(resultType, decimal.longValue().toString + 
"L")
+        } else {
+          throw new CodeGenException("Decimal can not be converted to interval 
of milliseconds.")
+        }
 
       case t@_ =>
         throw new CodeGenException(s"Type not supported: $t")
@@ -645,11 +659,12 @@ class CodeGenerator(
         requireNumeric(right)
         generateArithmeticOperator("+", nullCheck, resultType, left, right)
 
-      case PLUS if isString(resultType) =>
+      case PLUS | DATETIME_PLUS if isTemporal(resultType) =>
         val left = operands.head
         val right = operands(1)
-        requireString(left)
-        generateStringConcatOperator(nullCheck, left, right)
+        requireTemporal(left)
+        requireTemporal(right)
+        generateTemporalPlusMinus(plus = true, nullCheck, left, right)
 
       case MINUS if isNumeric(resultType) =>
         val left = operands.head
@@ -658,6 +673,13 @@ class CodeGenerator(
         requireNumeric(right)
         generateArithmeticOperator("-", nullCheck, resultType, left, right)
 
+      case MINUS if isTemporal(resultType) =>
+        val left = operands.head
+        val right = operands(1)
+        requireTemporal(left)
+        requireTemporal(right)
+        generateTemporalPlusMinus(plus = false, nullCheck, left, right)
+
       case MULTIPLY if isNumeric(resultType) =>
         val left = operands.head
         val right = operands(1)
@@ -684,11 +706,21 @@ class CodeGenerator(
         requireNumeric(operand)
         generateUnaryArithmeticOperator("-", nullCheck, resultType, operand)
 
+      case UNARY_MINUS if isTimeInterval(resultType) =>
+        val operand = operands.head
+        requireTimeInterval(operand)
+        generateUnaryIntervalPlusMinus(plus = false, nullCheck, operand)
+
       case UNARY_PLUS if isNumeric(resultType) =>
         val operand = operands.head
         requireNumeric(operand)
         generateUnaryArithmeticOperator("+", nullCheck, resultType, operand)
 
+      case UNARY_PLUS if isTimeInterval(resultType) =>
+        val operand = operands.head
+        requireTimeInterval(operand)
+        generateUnaryIntervalPlusMinus(plus = true, nullCheck, operand)
+
       // comparison
       case EQUALS =>
         val left = operands.head
@@ -760,7 +792,7 @@ class CodeGenerator(
         generateIfElse(nullCheck, operands, resultType)
 
       // casting
-      case CAST =>
+      case CAST | REINTERPRET =>
         val operand = operands.head
         generateCast(nullCheck, operand, resultType)
 
@@ -946,8 +978,8 @@ class CodeGenerator(
     val defaultValue = primitiveDefaultValue(literalType)
 
     // explicit unboxing
-    val unboxedLiteralCode = if (isTemporal(literalType)) {
-      temporalToInternalCode(literalType, literalCode)
+    val unboxedLiteralCode = if (isTimePoint(literalType)) {
+      timePointToInternalCode(literalType, literalCode)
     } else {
       literalCode
     }
@@ -1023,7 +1055,7 @@ class CodeGenerator(
       case SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIME | 
SqlTimeTypeInfo.TIMESTAMP =>
         val resultTerm = newName("result")
         val resultTypeTerm = boxedTypeTermForTypeInfo(expr.resultType)
-        val convMethod = internalToTemporalCode(expr.resultType, 
expr.resultTerm)
+        val convMethod = internalToTimePointCode(expr.resultType, 
expr.resultTerm)
 
         val resultCode = if (nullCheck) {
           s"""

http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
index 28c66b6..81187a5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
@@ -17,12 +17,14 @@
  */
 package org.apache.flink.api.table.codegen.calls
 
-import org.apache.calcite.avatica.util.DateTimeUtils
+import org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY
+import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange}
 import org.apache.calcite.util.BuiltInMethod
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, SqlTimeTypeInfo, 
TypeInformation}
 import org.apache.flink.api.table.codegen.CodeGenUtils._
 import org.apache.flink.api.table.codegen.{CodeGenException, 
GeneratedExpression}
+import org.apache.flink.api.table.typeutils.IntervalTypeInfo
 import org.apache.flink.api.table.typeutils.TypeCheckUtils._
 
 object ScalarOperators {
@@ -70,10 +72,8 @@ object ScalarOperators {
           s"$operandTerm.negate()"
         } else if (isDecimal(operand.resultType) && operator == "+") {
           s"$operandTerm"
-        } else if (isNumeric(operand.resultType)) {
+        } else {
           s"$operator($operandTerm)"
-        }  else {
-          throw new CodeGenException("Unsupported unary operator.")
         }
     }
   }
@@ -396,7 +396,23 @@ object ScalarOperators {
     // Date/Time/Timestamp -> String
     case (dtt: SqlTimeTypeInfo[_], STRING_TYPE_INFO) =>
       generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
-        (operandTerm) => s"""${internalToTemporalCode(dtt, 
operandTerm)}.toString()"""
+        (operandTerm) => s"""${internalToTimePointCode(dtt, 
operandTerm)}.toString()"""
+      }
+
+    // Interval Months -> String
+    case (IntervalTypeInfo.INTERVAL_MONTHS, STRING_TYPE_INFO) =>
+      val method = 
qualifyMethod(BuiltInMethod.INTERVAL_YEAR_MONTH_TO_STRING.method)
+      val timeUnitRange = qualifyEnum(TimeUnitRange.YEAR_TO_MONTH)
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) => s"$method($operandTerm, $timeUnitRange)"
+      }
+
+    // Interval Millis -> String
+    case (IntervalTypeInfo.INTERVAL_MILLIS, STRING_TYPE_INFO) =>
+      val method = 
qualifyMethod(BuiltInMethod.INTERVAL_DAY_TIME_TO_STRING.method)
+      val timeUnitRange = qualifyEnum(TimeUnitRange.DAY_TO_SECOND)
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) => s"$method($operandTerm, $timeUnitRange, 3)" // milli 
second precision
       }
 
     // * (not Date/Time/Timestamp) -> String
@@ -508,25 +524,28 @@ object ScalarOperators {
             s"${classOf[DateTimeUtils].getCanonicalName}.MILLIS_PER_DAY)"
       }
 
-    // Date -> Integer, Time -> Integer
-    case (SqlTimeTypeInfo.DATE, INT_TYPE_INFO) | (SqlTimeTypeInfo.TIME, 
INT_TYPE_INFO) =>
-      internalExprCasting(operand, INT_TYPE_INFO)
-
+    // internal temporal casting
+    // Date -> Integer
+    // Time -> Integer
     // Timestamp -> Long
-    case (SqlTimeTypeInfo.TIMESTAMP, LONG_TYPE_INFO) =>
-      internalExprCasting(operand, LONG_TYPE_INFO)
-
     // Integer -> Date
-    case (INT_TYPE_INFO, SqlTimeTypeInfo.DATE) =>
-      internalExprCasting(operand, SqlTimeTypeInfo.DATE)
-
     // Integer -> Time
-    case (INT_TYPE_INFO, SqlTimeTypeInfo.TIME) =>
-      internalExprCasting(operand, SqlTimeTypeInfo.TIME)
-
     // Long -> Timestamp
-    case (LONG_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) =>
-      internalExprCasting(operand, SqlTimeTypeInfo.TIMESTAMP)
+    // Integer -> Interval Months
+    // Long -> Interval Millis
+    // Interval Months -> Integer
+    // Interval Millis -> Long
+    case (SqlTimeTypeInfo.DATE, INT_TYPE_INFO) |
+         (SqlTimeTypeInfo.TIME, INT_TYPE_INFO) |
+         (SqlTimeTypeInfo.TIMESTAMP, LONG_TYPE_INFO) |
+         (INT_TYPE_INFO, SqlTimeTypeInfo.DATE) |
+         (INT_TYPE_INFO, SqlTimeTypeInfo.TIME) |
+         (LONG_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) |
+         (INT_TYPE_INFO, IntervalTypeInfo.INTERVAL_MONTHS) |
+         (LONG_TYPE_INFO, IntervalTypeInfo.INTERVAL_MILLIS) |
+         (IntervalTypeInfo.INTERVAL_MONTHS, INT_TYPE_INFO) |
+         (IntervalTypeInfo.INTERVAL_MONTHS, LONG_TYPE_INFO) =>
+      internalExprCasting(operand, targetType)
 
     case (from, to) =>
       throw new CodeGenException(s"Unsupported cast from '$from' to '$to'.")
@@ -591,6 +610,60 @@ object ScalarOperators {
     }
   }
 
+  def generateTemporalPlusMinus(
+      plus: Boolean,
+      nullCheck: Boolean,
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+
+    val operator = if (plus) "+" else "-"
+
+    (left.resultType, right.resultType) match {
+      case (l: IntervalTypeInfo[_], r: IntervalTypeInfo[_]) if l == r =>
+        generateArithmeticOperator(operator, nullCheck, l, left, right)
+
+      case (SqlTimeTypeInfo.DATE, IntervalTypeInfo.INTERVAL_MILLIS) |
+           (IntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.DATE) =>
+        generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, 
right) {
+          if (isTimePoint(left.resultType)) {
+            (leftTerm, rightTerm) => s"$leftTerm + ((int) ($rightTerm / 
${MILLIS_PER_DAY}L))"
+          } else {
+            (leftTerm, rightTerm) => s"((int) ($leftTerm / 
${MILLIS_PER_DAY}L)) + $rightTerm"
+          }
+        }
+
+      case (SqlTimeTypeInfo.TIME, IntervalTypeInfo.INTERVAL_MILLIS) |
+           (IntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.TIME) =>
+        generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIME, left, 
right) {
+          if (isTimePoint(left.resultType)) {
+            (leftTerm, rightTerm) => s"$leftTerm + ((int) ($rightTerm))"
+          } else {
+            (leftTerm, rightTerm) => s"((int) ($leftTerm)) + $rightTerm"
+          }
+        }
+
+      case (SqlTimeTypeInfo.TIMESTAMP, IntervalTypeInfo.INTERVAL_MILLIS) =>
+        generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIMESTAMP, left, 
right) {
+          (leftTerm, rightTerm) => s"$leftTerm + $rightTerm"
+        }
+
+      // TODO more operations when CALCITE-308 is fixed
+
+      case _ =>
+        throw new CodeGenException("Unsupported temporal arithmetic.")
+    }
+  }
+
+  def generateUnaryIntervalPlusMinus(
+      plus: Boolean,
+      nullCheck: Boolean,
+      operand: GeneratedExpression)
+    : GeneratedExpression = {
+    val operator = if (plus) "+" else "-"
+    generateUnaryArithmeticOperator(operator, nullCheck, operand.resultType, 
operand)
+  }
+
   // 
----------------------------------------------------------------------------------------------
 
   private def generateUnaryOperatorIfNotNull(

http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
index 6aa4f89..41222c8 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
@@ -17,8 +17,11 @@
  */
 package org.apache.flink.api.table.expressions
 
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation, 
BasicTypeInfo}
+import org.apache.calcite.avatica.util.DateTimeUtils.{MILLIS_PER_DAY, 
MILLIS_PER_HOUR, MILLIS_PER_MINUTE, MILLIS_PER_SECOND}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, 
TypeInformation}
 import org.apache.flink.api.table.ExpressionParserException
+import 
org.apache.flink.api.table.expressions.ExpressionUtils.{toMilliInterval, 
toMonthInterval}
+import org.apache.flink.api.table.typeutils.IntervalTypeInfo
 
 import scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
 
@@ -57,6 +60,13 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   lazy val TO_DATE: Keyword = Keyword("toDate")
   lazy val TO_TIME: Keyword = Keyword("toTime")
   lazy val TO_TIMESTAMP: Keyword = Keyword("toTimestamp")
+  lazy val YEAR: Keyword = Keyword("year")
+  lazy val MONTH: Keyword = Keyword("month")
+  lazy val DAY: Keyword = Keyword("day")
+  lazy val HOUR: Keyword = Keyword("hour")
+  lazy val MINUTE: Keyword = Keyword("minute")
+  lazy val SECOND: Keyword = Keyword("second")
+  lazy val MILLI: Keyword = Keyword("milli")
 
   def functionIdent: ExpressionParser.Parser[String] =
     not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
@@ -68,6 +78,12 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   lazy val dataType: PackratParser[TypeInformation[_]] =
     "BYTE" ^^ { ti => BasicTypeInfo.BYTE_TYPE_INFO } |
       "SHORT" ^^ { ti => BasicTypeInfo.SHORT_TYPE_INFO } |
+      "INTERVAL_MONTHS" ^^ {
+        ti => IntervalTypeInfo.INTERVAL_MONTHS.asInstanceOf[TypeInformation[_]]
+      } |
+      "INTERVAL_MILLIS" ^^ {
+        ti => IntervalTypeInfo.INTERVAL_MILLIS.asInstanceOf[TypeInformation[_]]
+      } |
       "INT" ^^ { ti => BasicTypeInfo.INT_TYPE_INFO } |
       "LONG" ^^ { ti => BasicTypeInfo.LONG_TYPE_INFO } |
       "FLOAT" ^^ { ti => BasicTypeInfo.FLOAT_TYPE_INFO } |
@@ -81,10 +97,14 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
 
   // Literals
 
+  // same as floatingPointNumber but we do not allow trailing dot "12.d" or 
"2."
+  lazy val floatingPointNumberFlink: Parser[String] =
+    """-?(\d+(\.\d+)?|\d*\.\d+)([eE][+-]?\d+)?[fFdD]?""".r
+
   lazy val numberLiteral: PackratParser[Expression] =
     (wholeNumber <~ ("l" | "L")) ^^ { n => Literal(n.toLong) } |
       (decimalNumber <~ ("p" | "P")) ^^ { n => Literal(BigDecimal(n)) } |
-      (floatingPointNumber | decimalNumber) ^^ {
+      (floatingPointNumberFlink | decimalNumber) ^^ {
         n =>
           if (n.matches("""-?\d+""")) {
             Literal(n.toInt)
@@ -109,7 +129,7 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   }
 
   lazy val nullLiteral: PackratParser[Expression] = NULL ~ "(" ~> dataType <~ 
")" ^^ {
-    case dt => Null(dt)
+    dt => Null(dt)
   }
 
   lazy val literalExpr: PackratParser[Expression] =
@@ -169,7 +189,7 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   }
 
   lazy val suffixTrimWithoutArgs = composite <~ ".trim" ~ opt("()") ^^ {
-    case e =>
+    e =>
       Trim(TrimConstants.TRIM_BOTH, TrimConstants.TRIM_DEFAULT_CHAR, e)
   }
 
@@ -198,10 +218,29 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   lazy val suffixToTime: PackratParser[Expression] =
     composite <~ "." ~ TO_TIME ~ opt("()") ^^ { e => Cast(e, 
SqlTimeTypeInfo.TIME) }
 
+  lazy val suffixTimeInterval : PackratParser[Expression] =
+    composite ~ "." ~ (YEAR | MONTH | DAY | HOUR | MINUTE | SECOND | MILLI) ^^ 
{
+
+    case expr ~ _ ~ YEAR.key => toMonthInterval(expr, 12)
+
+    case expr ~ _ ~ MONTH.key => toMonthInterval(expr, 1)
+
+    case expr ~ _ ~ DAY.key => toMilliInterval(expr, MILLIS_PER_DAY)
+
+    case expr ~ _ ~ HOUR.key => toMilliInterval(expr, MILLIS_PER_HOUR)
+
+    case expr ~ _ ~ MINUTE.key => toMilliInterval(expr, MILLIS_PER_MINUTE)
+
+    case expr ~ _ ~ SECOND.key => toMilliInterval(expr, MILLIS_PER_SECOND)
+
+    case expr ~ _ ~ MILLI.key => toMilliInterval(expr, 1)
+  }
+
   lazy val suffixed: PackratParser[Expression] =
-    suffixIsNull | suffixIsNotNull | suffixSum | suffixMin | suffixMax | 
suffixCount | suffixAvg |
-      suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs | suffixIf | 
suffixFunctionCall |
-        suffixAsc | suffixDesc | suffixToDate | suffixToTimestamp | 
suffixToTime
+    suffixTimeInterval | suffixIsNull | suffixIsNotNull | suffixSum | 
suffixMin | suffixMax |
+      suffixCount | suffixAvg | suffixCast | suffixAs | suffixTrim | 
suffixTrimWithoutArgs |
+      suffixIf | suffixAsc | suffixDesc | suffixToDate | suffixToTimestamp | 
suffixToTime |
+      suffixFunctionCall // function call must always be at the end
 
   // prefix operators
 
@@ -263,7 +302,8 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
 
   lazy val prefixed: PackratParser[Expression] =
     prefixIsNull | prefixIsNotNull | prefixSum | prefixMin | prefixMax | 
prefixCount | prefixAvg |
-      prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf | 
prefixFunctionCall
+      prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf |
+      prefixFunctionCall // function call must always be at the end
 
   // suffix/prefix composite
 
@@ -339,10 +379,10 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
     parseAll(expressionList, expression) match {
       case Success(lst, _) => lst
 
-      case Failure(msg, _) => throw new ExpressionParserException(
+      case Failure(msg, _) => throw ExpressionParserException(
         "Could not parse expression: " + msg)
 
-      case Error(msg, _) => throw new ExpressionParserException(
+      case Error(msg, _) => throw ExpressionParserException(
         "Could not parse expression: " + msg)
     }
   }
@@ -352,7 +392,7 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
       case Success(lst, _) => lst
 
       case fail =>
-        throw new ExpressionParserException("Could not parse expression: " + 
fail.toString)
+        throw ExpressionParserException("Could not parse expression: " + 
fail.toString)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionUtils.scala
new file mode 100644
index 0000000..89671de
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionUtils.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.table.typeutils.IntervalTypeInfo
+
+object ExpressionUtils {
+
+  private[flink] def toMonthInterval(expr: Expression, multiplier: Int): 
Expression = expr match {
+    case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) =>
+      Literal(value * multiplier, IntervalTypeInfo.INTERVAL_MONTHS)
+    case _ =>
+      Cast(Mul(expr, Literal(multiplier)), IntervalTypeInfo.INTERVAL_MONTHS)
+  }
+
+  private[flink] def toMilliInterval(expr: Expression, multiplier: Long): 
Expression = expr match {
+    case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) =>
+      Literal(value * multiplier, IntervalTypeInfo.INTERVAL_MILLIS)
+    case _ =>
+      Cast(Mul(expr, Literal(multiplier)), IntervalTypeInfo.INTERVAL_MILLIS)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
index e4ed08e..b301f22 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
@@ -22,8 +22,8 @@ import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.table.typeutils.TypeCheckUtils.{isNumeric, 
isString}
-import org.apache.flink.api.table.typeutils.{TypeCheckUtils, TypeCoercion}
+import org.apache.flink.api.table.typeutils.TypeCheckUtils._
+import org.apache.flink.api.table.typeutils.TypeCoercion
 import org.apache.flink.api.table.validate._
 
 import scala.collection.JavaConversions._
@@ -61,10 +61,14 @@ case class Plus(left: Expression, right: Expression) 
extends BinaryArithmetic {
   override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
     if(isString(left.resultType)) {
       val castedRight = Cast(right, BasicTypeInfo.STRING_TYPE_INFO)
-      relBuilder.call(SqlStdOperatorTable.PLUS, left.toRexNode, 
castedRight.toRexNode)
+      relBuilder.call(SqlStdOperatorTable.CONCAT, left.toRexNode, 
castedRight.toRexNode)
     } else if(isString(right.resultType)) {
       val castedLeft = Cast(left, BasicTypeInfo.STRING_TYPE_INFO)
-      relBuilder.call(SqlStdOperatorTable.PLUS, castedLeft.toRexNode, 
right.toRexNode)
+      relBuilder.call(SqlStdOperatorTable.CONCAT, castedLeft.toRexNode, 
right.toRexNode)
+    } else if (isTimeInterval(left.resultType) && left.resultType == 
right.resultType) {
+      relBuilder.call(SqlStdOperatorTable.PLUS, left.toRexNode, 
right.toRexNode)
+    } else if (isTemporal(left.resultType) && isTemporal(right.resultType)) {
+      relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, left.toRexNode, 
right.toRexNode)
     } else {
       val castedLeft = Cast(left, resultType)
       val castedRight = Cast(right, resultType)
@@ -72,15 +76,22 @@ case class Plus(left: Expression, right: Expression) 
extends BinaryArithmetic {
     }
   }
 
-  // TODO: tighten this rule once we implemented type coercion rules during 
validation
   override private[flink] def validateInput(): ExprValidationResult = {
     if (isString(left.resultType) || isString(right.resultType)) {
       ValidationSuccess
-    } else if (!isNumeric(left.resultType) || !isNumeric(right.resultType)) {
-      ValidationFailure(s"$this requires Numeric or String input," +
-        s" get $left : ${left.resultType} and $right : ${right.resultType}")
-    } else {
+    } else if (isTimeInterval(left.resultType) && left.resultType == 
right.resultType) {
+      ValidationSuccess
+    } else if (isTimePoint(left.resultType) && 
isTimeInterval(right.resultType)) {
+      ValidationSuccess
+    } else if (isTimeInterval(left.resultType) && 
isTimePoint(right.resultType)) {
+      ValidationSuccess
+    } else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
       ValidationSuccess
+    } else {
+      ValidationFailure(
+        s"$this requires Numeric, String, Intervals of same type, " +
+        s"or Interval and a time point input, " +
+        s"get $left : ${left.resultType} and $right : ${right.resultType}")
     }
   }
 }
@@ -94,14 +105,29 @@ case class UnaryMinus(child: Expression) extends 
UnaryExpression {
 
   override private[flink] def resultType = child.resultType
 
-  override private[flink] def validateInput(): ExprValidationResult =
-    TypeCheckUtils.assertNumericExpr(child.resultType, "unary minus")
+  override private[flink] def validateInput(): ExprValidationResult = {
+    if (isNumeric(child.resultType)) {
+      ValidationSuccess
+    } else if (isTimeInterval(child.resultType)) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"$this requires Numeric, or Interval input, get 
${child.resultType}")
+    }
+  }
 }
 
 case class Minus(left: Expression, right: Expression) extends BinaryArithmetic 
{
   override def toString = s"($left - $right)"
 
   private[flink] val sqlOperator = SqlStdOperatorTable.MINUS
+
+  override private[flink] def validateInput(): ExprValidationResult = {
+    if (isTimeInterval(left.resultType) && left.resultType == 
right.resultType) {
+      ValidationSuccess
+    } else {
+      super.validateInput()
+    }
+  }
 }
 
 case class Div(left: Expression, right: Expression) extends BinaryArithmetic {

http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
index 525d010..f65dd5b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
@@ -32,7 +32,8 @@ case class Cast(child: Expression, resultType: 
TypeInformation[_]) extends Unary
     val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
     relBuilder
       .getRexBuilder
-      .makeCast(
+      // we use abstract cast here because RelBuilder.cast() has to many side 
effects
+      .makeAbstractCast(
         typeFactory.createTypeFromTypeInfo(resultType),
         child.toRexNode)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
index cd3de60..677160a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
@@ -20,11 +20,15 @@ package org.apache.flink.api.table.expressions
 import java.sql.{Date, Time, Timestamp}
 import java.util.{Calendar, TimeZone}
 
+import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlIntervalQualifier
 import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, 
TypeInformation}
 import org.apache.flink.api.table.FlinkTypeFactory
+import org.apache.flink.api.table.typeutils.IntervalTypeInfo
 
 object Literal {
   private[flink] def apply(l: Any): Literal = l match {
@@ -63,6 +67,22 @@ case class Literal(value: Any, resultType: 
TypeInformation[_]) extends LeafExpre
       case SqlTimeTypeInfo.TIMESTAMP =>
         relBuilder.getRexBuilder.makeTimestampLiteral(dateToCalendar, 3)
 
+      case IntervalTypeInfo.INTERVAL_MONTHS =>
+        val interval = java.math.BigDecimal.valueOf(value.asInstanceOf[Int])
+        val intervalQualifier = new SqlIntervalQualifier(
+          TimeUnit.YEAR,
+          TimeUnit.MONTH,
+          SqlParserPos.ZERO)
+        relBuilder.getRexBuilder.makeIntervalLiteral(interval, 
intervalQualifier)
+
+      case IntervalTypeInfo.INTERVAL_MILLIS =>
+        val interval = java.math.BigDecimal.valueOf(value.asInstanceOf[Long])
+        val intervalQualifier = new SqlIntervalQualifier(
+          TimeUnit.DAY,
+          TimeUnit.SECOND,
+          SqlParserPos.ZERO)
+        relBuilder.getRexBuilder.makeIntervalLiteral(interval, 
intervalQualifier)
+
       case _ => relBuilder.literal(value)
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/IntervalTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/IntervalTypeInfo.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/IntervalTypeInfo.scala
new file mode 100644
index 0000000..85524fb
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/IntervalTypeInfo.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeutils
+
+import java.util.Objects
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.base.{IntComparator, 
IntSerializer, LongComparator, LongSerializer}
+import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
+import 
org.apache.flink.api.table.typeutils.IntervalTypeInfo.instantiateComparator
+import org.apache.flink.util.Preconditions._
+
+/**
+  * TypeInformation for SQL INTERVAL types.
+  */
+@SerialVersionUID(-1816179424364825258L)
+class IntervalTypeInfo[T](
+    val clazz: Class[T],
+    val serializer: TypeSerializer[T],
+    val comparatorClass: Class[_ <: TypeComparator[T]])
+  extends TypeInformation[T]
+  with AtomicType[T] {
+
+  checkNotNull(clazz)
+  checkNotNull(serializer)
+  checkNotNull(comparatorClass)
+
+  override def isBasicType: Boolean = false
+
+  override def isTupleType: Boolean = false
+
+  override def getArity: Int = 1
+
+  override def getTotalFields: Int = 1
+
+  override def getTypeClass: Class[T] = clazz
+
+  override def isKeyType: Boolean = true
+
+  override def createSerializer(config: ExecutionConfig): TypeSerializer[T] = 
serializer
+
+  override def createComparator(
+      sortOrderAscending: Boolean,
+      executionConfig: ExecutionConfig)
+    : TypeComparator[T] = instantiateComparator(comparatorClass, 
sortOrderAscending)
+
+  // 
----------------------------------------------------------------------------------------------
+
+  override def hashCode: Int = Objects.hash(clazz, serializer, comparatorClass)
+
+  def canEqual(obj: Any): Boolean = obj.isInstanceOf[IntervalTypeInfo[_]]
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case other: IntervalTypeInfo[_] =>
+        other.canEqual(this) &&
+          (this.clazz eq other.clazz) &&
+          serializer == other.serializer &&
+          (this.comparatorClass eq other.comparatorClass)
+      case _ =>
+        false
+    }
+  }
+
+  override def toString: String = s"IntervalTypeInfo(${clazz.getSimpleName})"
+}
+
+object IntervalTypeInfo {
+
+  val INTERVAL_MONTHS =
+    new IntervalTypeInfo(classOf[java.lang.Integer], IntSerializer.INSTANCE, 
classOf[IntComparator])
+
+  val INTERVAL_MILLIS =
+    new IntervalTypeInfo(classOf[java.lang.Long], LongSerializer.INSTANCE, 
classOf[LongComparator])
+
+  // 
----------------------------------------------------------------------------------------------
+
+  private def instantiateComparator[X](
+      comparatorClass: Class[_ <: TypeComparator[X]],
+      ascendingOrder: java.lang.Boolean)
+    : TypeComparator[X] = {
+    try {
+      val constructor = 
comparatorClass.getConstructor(classOf[java.lang.Boolean])
+      constructor.newInstance(ascendingOrder)
+    } catch {
+      case e: Exception =>
+        throw new RuntimeException(
+          s"Could not initialize comparator ${comparatorClass.getName}", e)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
index c19deec..4ff7888 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala
@@ -30,6 +30,7 @@ object TypeCheckUtils {
   def isAdvanced(dataType: TypeInformation[_]): Boolean = dataType match {
     case _: BasicTypeInfo[_] => false
     case _: SqlTimeTypeInfo[_] => false
+    case _: IntervalTypeInfo[_] => false
     case _ => true
   }
 
@@ -45,7 +46,14 @@ object TypeCheckUtils {
     case _ => false
   }
 
-  def isTemporal(dataType: TypeInformation[_]): Boolean = 
dataType.isInstanceOf[SqlTimeTypeInfo[_]]
+  def isTemporal(dataType: TypeInformation[_]): Boolean =
+    isTimePoint(dataType) || isTimeInterval(dataType)
+
+  def isTimePoint(dataType: TypeInformation[_]): Boolean =
+    dataType.isInstanceOf[SqlTimeTypeInfo[_]]
+
+  def isTimeInterval(dataType: TypeInformation[_]): Boolean =
+    dataType.isInstanceOf[IntervalTypeInfo[_]]
 
   def isString(dataType: TypeInformation[_]): Boolean = dataType == 
STRING_TYPE_INFO
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala
index dad13fb..bb3d060 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala
@@ -45,6 +45,9 @@ object TypeCoercion {
       case (_, BIG_DEC_TYPE_INFO) => Some(BIG_DEC_TYPE_INFO)
       case (BIG_DEC_TYPE_INFO, _) => Some(BIG_DEC_TYPE_INFO)
 
+      case (stti: SqlTimeTypeInfo[_], _: IntervalTypeInfo[_]) => Some(stti)
+      case (_: IntervalTypeInfo[_], stti: SqlTimeTypeInfo[_]) => Some(stti)
+
       case tuple if 
tuple.productIterator.forall(numericWideningPrecedence.contains) =>
         val higherIndex = numericWideningPrecedence.lastIndexWhere(t => t == 
tp1 || t == tp2)
         Some(numericWideningPrecedence(higherIndex))
@@ -100,6 +103,8 @@ object TypeCoercion {
     case (INT_TYPE_INFO, SqlTimeTypeInfo.DATE) => true
     case (INT_TYPE_INFO, SqlTimeTypeInfo.TIME) => true
     case (LONG_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) => true
+    case (INT_TYPE_INFO, IntervalTypeInfo.INTERVAL_MONTHS) => true
+    case (LONG_TYPE_INFO, IntervalTypeInfo.INTERVAL_MILLIS) => true
 
     case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIME) => false
     case (SqlTimeTypeInfo.TIME, SqlTimeTypeInfo.DATE) => false
@@ -108,6 +113,9 @@ object TypeCoercion {
     case (SqlTimeTypeInfo.TIME, INT_TYPE_INFO) => true
     case (SqlTimeTypeInfo.TIMESTAMP, LONG_TYPE_INFO) => true
 
+    case (IntervalTypeInfo.INTERVAL_MONTHS, INT_TYPE_INFO) => true
+    case (IntervalTypeInfo.INTERVAL_MILLIS, LONG_TYPE_INFO) => true
+
     case _ => false
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java
index 7d225f3..b060540 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java
@@ -192,11 +192,12 @@ public class ExpressionsITCase extends 
TableProgramsTestBase {
                                        "a.cast(STRING) + a.cast(STRING)," +
                                        "CAST(ISNULL(b), INT)," +
                                        "ISNULL(CAST(b, INT).abs()) === false," 
+
-                                       "((((true) === true) || 
false).cast(STRING) + 'X ').trim");
+                                       "((((true) === true) || 
false).cast(STRING) + 'X ').trim," +
+                                       "12.isNull");
 
                DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
                List<Row> results = ds.collect();
-               String expected = "false,10,55,0,true,trueX";
+               String expected = "false,10,55,0,true,trueX,false";
                compareResultAsText(results, expected);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/TemporalTypesTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/TemporalTypesTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/TemporalTypesTest.scala
new file mode 100644
index 0000000..63d6346
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/TemporalTypesTest.scala
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expression
+
+import java.sql.{Date, Time, Timestamp}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{Row, Types}
+import org.junit.Test
+
+class TemporalTypesTest extends ExpressionTestBase {
+
+  @Test
+  def testTimePointLiterals(): Unit = {
+    testAllApis(
+      "1990-10-14".toDate,
+      "'1990-10-14'.toDate",
+      "DATE '1990-10-14'",
+      "1990-10-14")
+
+    testTableApi(
+      Date.valueOf("2040-09-11"),
+      "'2040-09-11'.toDate",
+      "2040-09-11")
+
+    testAllApis(
+      "1500-04-30".cast(Types.DATE),
+      "'1500-04-30'.cast(DATE)",
+      "CAST('1500-04-30' AS DATE)",
+      "1500-04-30")
+
+    testAllApis(
+      "15:45:59".toTime,
+      "'15:45:59'.toTime",
+      "TIME '15:45:59'",
+      "15:45:59")
+
+    testTableApi(
+      Time.valueOf("00:00:00"),
+      "'00:00:00'.toTime",
+      "00:00:00")
+
+    testAllApis(
+      "1:30:00".cast(Types.TIME),
+      "'1:30:00'.cast(TIME)",
+      "CAST('1:30:00' AS TIME)",
+      "01:30:00")
+
+    testAllApis(
+      "1990-10-14 23:00:00.123".toTimestamp,
+      "'1990-10-14 23:00:00.123'.toTimestamp",
+      "TIMESTAMP '1990-10-14 23:00:00.123'",
+      "1990-10-14 23:00:00.123")
+
+    testTableApi(
+      Timestamp.valueOf("2040-09-11 00:00:00.000"),
+      "'2040-09-11 00:00:00.000'.toTimestamp",
+      "2040-09-11 00:00:00.0")
+
+    testAllApis(
+      "1500-04-30 12:00:00".cast(Types.TIMESTAMP),
+      "'1500-04-30 12:00:00'.cast(TIMESTAMP)",
+      "CAST('1500-04-30 12:00:00' AS TIMESTAMP)",
+      "1500-04-30 12:00:00.0")
+  }
+
+  @Test
+  def testTimeIntervalLiterals(): Unit = {
+    testAllApis(
+      1.year,
+      "1.year",
+      "INTERVAL '1' YEAR",
+      "+1-00")
+
+    testAllApis(
+      1.month,
+      "1.month",
+      "INTERVAL '1' MONTH",
+      "+0-01")
+
+    testAllApis(
+      12.day,
+      "12.day",
+      "INTERVAL '12' DAY",
+      "+12 00:00:00.000")
+
+    testAllApis(
+      1.hour,
+      "1.hour",
+      "INTERVAL '1' HOUR",
+      "+0 01:00:00.000")
+
+    testAllApis(
+      3.minute,
+      "3.minute",
+      "INTERVAL '3' MINUTE",
+      "+0 00:03:00.000")
+
+    testAllApis(
+      3.second,
+      "3.second",
+      "INTERVAL '3' SECOND",
+      "+0 00:00:03.000")
+
+    testAllApis(
+      3.milli,
+      "3.milli",
+      "INTERVAL '0.003' SECOND",
+      "+0 00:00:00.003")
+  }
+
+  @Test
+  def testTimePointInput(): Unit = {
+    testAllApis(
+      'f0,
+      "f0",
+      "f0",
+      "1990-10-14")
+
+    testAllApis(
+      'f1,
+      "f1",
+      "f1",
+      "10:20:45")
+
+    testAllApis(
+      'f2,
+      "f2",
+      "f2",
+      "1990-10-14 10:20:45.123")
+  }
+
+  @Test
+  def testTimeIntervalInput(): Unit = {
+    testAllApis(
+      'f9,
+      "f9",
+      "f9",
+      "+2-00")
+
+    testAllApis(
+      'f10,
+      "f10",
+      "f10",
+      "+0 00:00:12.000")
+  }
+
+  @Test
+  def testTimePointCasting(): Unit = {
+    testAllApis(
+      'f0.cast(Types.TIMESTAMP),
+      "f0.cast(TIMESTAMP)",
+      "CAST(f0 AS TIMESTAMP)",
+      "1990-10-14 00:00:00.0")
+
+    testAllApis(
+      'f1.cast(Types.TIMESTAMP),
+      "f1.cast(TIMESTAMP)",
+      "CAST(f1 AS TIMESTAMP)",
+      "1970-01-01 10:20:45.0")
+
+    testAllApis(
+      'f2.cast(Types.DATE),
+      "f2.cast(DATE)",
+      "CAST(f2 AS DATE)",
+      "1990-10-14")
+
+    testAllApis(
+      'f2.cast(Types.TIME),
+      "f2.cast(TIME)",
+      "CAST(f2 AS TIME)",
+      "10:20:45")
+
+    testAllApis(
+      'f2.cast(Types.TIME),
+      "f2.cast(TIME)",
+      "CAST(f2 AS TIME)",
+      "10:20:45")
+
+    testTableApi(
+      'f7.cast(Types.DATE),
+      "f7.cast(DATE)",
+      "2002-11-09")
+
+    testTableApi(
+      'f7.cast(Types.DATE).cast(Types.INT),
+      "f7.cast(DATE).cast(INT)",
+      "12000")
+
+    testTableApi(
+      'f7.cast(Types.TIME),
+      "f7.cast(TIME)",
+      "00:00:12")
+
+    testTableApi(
+      'f7.cast(Types.TIME).cast(Types.INT),
+      "f7.cast(TIME).cast(INT)",
+      "12000")
+
+    testTableApi(
+      'f8.cast(Types.TIMESTAMP),
+      "f8.cast(TIMESTAMP)",
+      "2016-06-27 07:23:33.0")
+
+    testTableApi(
+      'f8.cast(Types.TIMESTAMP).cast(Types.LONG),
+      "f8.cast(TIMESTAMP).cast(LONG)",
+      "1467012213000")
+  }
+
+  @Test
+  def testTimeIntervalCasting(): Unit = {
+    testTableApi(
+      'f7.cast(Types.INTERVAL_MONTHS),
+      "f7.cast(INTERVAL_MONTHS)",
+      "+1000-00")
+
+    testTableApi(
+      'f8.cast(Types.INTERVAL_MILLIS),
+      "f8.cast(INTERVAL_MILLIS)",
+      "+16979 07:23:33.000")
+  }
+
+  @Test
+  def testTimePointComparison(): Unit = {
+    testAllApis(
+      'f0 < 'f3,
+      "f0 < f3",
+      "f0 < f3",
+      "false")
+
+    testAllApis(
+      'f0 < 'f4,
+      "f0 < f4",
+      "f0 < f4",
+      "true")
+
+    testAllApis(
+      'f1 < 'f5,
+      "f1 < f5",
+      "f1 < f5",
+      "false")
+
+    testAllApis(
+      'f0.cast(Types.TIMESTAMP) !== 'f2,
+      "f0.cast(TIMESTAMP) !== f2",
+      "CAST(f0 AS TIMESTAMP) <> f2",
+      "true")
+
+    testAllApis(
+      'f0.cast(Types.TIMESTAMP) === 'f6,
+      "f0.cast(TIMESTAMP) === f6",
+      "CAST(f0 AS TIMESTAMP) = f6",
+      "true")
+  }
+
+  @Test
+  def testTimeIntervalArithmetic(): Unit = {
+    testAllApis(
+      12.month < 24.month,
+      "12.month < 24.month",
+      "INTERVAL '12' MONTH < INTERVAL '24' MONTH",
+      "true")
+
+    testAllApis(
+      8.milli > 10.milli,
+      "8.milli > 10.milli",
+      "INTERVAL '0.008' SECOND > INTERVAL '0.010' SECOND",
+      "false")
+
+    testAllApis(
+      8.year === 8.year,
+      "8.year === 8.year",
+      "INTERVAL '8' YEAR = INTERVAL '8' YEAR",
+      "true")
+
+    testAllApis(
+      8.year + 10.month,
+      "8.year + 10.month",
+      "INTERVAL '8' YEAR + INTERVAL '10' MONTH",
+      "+8-10")
+
+    testAllApis(
+      8.hour + 10.minute + 12.second + 5.milli,
+      "8.hour + 10.minute + 12.second + 5.milli",
+      "INTERVAL '8' HOUR + INTERVAL '10' MINUTE + INTERVAL '12.005' SECOND",
+      "+0 08:10:12.005")
+
+    testAllApis(
+      1.minute - 10.second,
+      "1.minute - 10.second",
+      "INTERVAL '1' MINUTE - INTERVAL '10' SECOND",
+      "+0 00:00:50.000")
+
+    testAllApis(
+      2.year - 12.month,
+      "2.year - 12.month",
+      "INTERVAL '2' YEAR - INTERVAL '12' MONTH",
+      "+1-00")
+
+    testAllApis(
+      -'f9.cast(Types.INTERVAL_MONTHS),
+      "-f9.cast(INTERVAL_MONTHS)",
+      "-CAST(f9 AS INTERVAL YEAR)",
+      "-2-00")
+
+    testAllApis(
+      'f0 + 2.day,
+      "f0 + 2.day",
+      "f0 + INTERVAL '2' DAY",
+      "1990-10-16")
+
+    testAllApis(
+      30.day + 'f0,
+      "30.day + f0",
+      "INTERVAL '30' DAY + f0",
+      "1990-11-13")
+
+    testAllApis(
+      'f1 + 12.hour,
+      "f1 + 12.hour",
+      "f1 + INTERVAL '12' HOUR",
+      "22:20:45")
+
+    testAllApis(
+      24.hour + 'f1,
+      "24.hour + f1",
+      "INTERVAL '24' HOUR + f1",
+      "10:20:45")
+
+    testAllApis(
+      'f2 + 10.day + 4.milli,
+      "f2 + 10.day + 4.milli",
+      "f2 + INTERVAL '10 00:00:00.004' DAY TO SECOND",
+      "1990-10-24 10:20:45.127")
+  }
+
+  // 
----------------------------------------------------------------------------------------------
+
+  def testData = {
+    val testData = new Row(11)
+    testData.setField(0, Date.valueOf("1990-10-14"))
+    testData.setField(1, Time.valueOf("10:20:45"))
+    testData.setField(2, Timestamp.valueOf("1990-10-14 10:20:45.123"))
+    testData.setField(3, Date.valueOf("1990-10-13"))
+    testData.setField(4, Date.valueOf("1990-10-15"))
+    testData.setField(5, Time.valueOf("00:00:00"))
+    testData.setField(6, Timestamp.valueOf("1990-10-14 00:00:00.0"))
+    testData.setField(7, 12000)
+    testData.setField(8, 1467012213000L)
+    testData.setField(9, 24)
+    testData.setField(10, 12000L)
+    testData
+  }
+
+  def typeInfo = {
+    new RowTypeInfo(Seq(
+      Types.DATE,
+      Types.TIME,
+      Types.TIMESTAMP,
+      Types.DATE,
+      Types.DATE,
+      Types.TIME,
+      Types.TIMESTAMP,
+      Types.INT,
+      Types.LONG,
+      Types.INTERVAL_MONTHS,
+      Types.INTERVAL_MILLIS)).asInstanceOf[TypeInformation[Any]]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/TimeTypesTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/TimeTypesTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/TimeTypesTest.scala
deleted file mode 100644
index 532a3bd..0000000
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/TimeTypesTest.scala
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expression
-
-import java.sql.{Date, Time, Timestamp}
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.expressions.utils.ExpressionTestBase
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.api.table.{Row, Types}
-import org.junit.Test
-
-class TimeTypesTest extends ExpressionTestBase {
-
-  @Test
-  def testTimeLiterals(): Unit = {
-    testAllApis(
-      "1990-10-14".toDate,
-      "'1990-10-14'.toDate",
-      "DATE '1990-10-14'",
-      "1990-10-14")
-
-    testTableApi(
-      Date.valueOf("2040-09-11"),
-      "'2040-09-11'.toDate",
-      "2040-09-11")
-
-    testAllApis(
-      "1500-04-30".cast(Types.DATE),
-      "'1500-04-30'.cast(DATE)",
-      "CAST('1500-04-30' AS DATE)",
-      "1500-04-30")
-
-    testAllApis(
-      "15:45:59".toTime,
-      "'15:45:59'.toTime",
-      "TIME '15:45:59'",
-      "15:45:59")
-
-    testTableApi(
-      Time.valueOf("00:00:00"),
-      "'00:00:00'.toTime",
-      "00:00:00")
-
-    testAllApis(
-      "1:30:00".cast(Types.TIME),
-      "'1:30:00'.cast(TIME)",
-      "CAST('1:30:00' AS TIME)",
-      "01:30:00")
-
-    testAllApis(
-      "1990-10-14 23:00:00.123".toTimestamp,
-      "'1990-10-14 23:00:00.123'.toTimestamp",
-      "TIMESTAMP '1990-10-14 23:00:00.123'",
-      "1990-10-14 23:00:00.123")
-
-    testTableApi(
-      Timestamp.valueOf("2040-09-11 00:00:00.000"),
-      "'2040-09-11 00:00:00.000'.toTimestamp",
-      "2040-09-11 00:00:00.0")
-
-    testAllApis(
-      "1500-04-30 12:00:00".cast(Types.TIMESTAMP),
-      "'1500-04-30 12:00:00'.cast(TIMESTAMP)",
-      "CAST('1500-04-30 12:00:00' AS TIMESTAMP)",
-      "1500-04-30 12:00:00.0")
-  }
-
-  @Test
-  def testTimeInput(): Unit = {
-    testAllApis(
-      'f0,
-      "f0",
-      "f0",
-      "1990-10-14")
-
-    testAllApis(
-      'f1,
-      "f1",
-      "f1",
-      "10:20:45")
-
-    testAllApis(
-      'f2,
-      "f2",
-      "f2",
-      "1990-10-14 10:20:45.123")
-  }
-
-  @Test
-  def testTimeCasting(): Unit = {
-    testAllApis(
-      'f0.cast(Types.TIMESTAMP),
-      "f0.cast(TIMESTAMP)",
-      "CAST(f0 AS TIMESTAMP)",
-      "1990-10-14 00:00:00.0")
-
-    testAllApis(
-      'f1.cast(Types.TIMESTAMP),
-      "f1.cast(TIMESTAMP)",
-      "CAST(f1 AS TIMESTAMP)",
-      "1970-01-01 10:20:45.0")
-
-    testAllApis(
-      'f2.cast(Types.DATE),
-      "f2.cast(DATE)",
-      "CAST(f2 AS DATE)",
-      "1990-10-14")
-
-    testAllApis(
-      'f2.cast(Types.TIME),
-      "f2.cast(TIME)",
-      "CAST(f2 AS TIME)",
-      "10:20:45")
-
-    testAllApis(
-      'f2.cast(Types.TIME),
-      "f2.cast(TIME)",
-      "CAST(f2 AS TIME)",
-      "10:20:45")
-
-    testTableApi(
-      'f7.cast(Types.DATE),
-      "f7.cast(DATE)",
-      "2002-11-09")
-
-    testTableApi(
-      'f7.cast(Types.DATE).cast(Types.INT),
-      "f7.cast(DATE).cast(INT)",
-      "12000")
-
-    testTableApi(
-      'f7.cast(Types.TIME),
-      "f7.cast(TIME)",
-      "00:00:12")
-
-    testTableApi(
-      'f7.cast(Types.TIME).cast(Types.INT),
-      "f7.cast(TIME).cast(INT)",
-      "12000")
-
-    testTableApi(
-      'f8.cast(Types.TIMESTAMP),
-      "f8.cast(TIMESTAMP)",
-      "2016-06-27 07:23:33.0")
-
-    testTableApi(
-      'f8.cast(Types.TIMESTAMP).cast(Types.LONG),
-      "f8.cast(TIMESTAMP).cast(LONG)",
-      "1467012213000")
-  }
-
-  @Test
-  def testTimeComparison(): Unit = {
-    testAllApis(
-      'f0 < 'f3,
-      "f0 < f3",
-      "f0 < f3",
-      "false")
-
-    testAllApis(
-      'f0 < 'f4,
-      "f0 < f4",
-      "f0 < f4",
-      "true")
-
-    testAllApis(
-      'f1 < 'f5,
-      "f1 < f5",
-      "f1 < f5",
-      "false")
-
-    testAllApis(
-      'f0.cast(Types.TIMESTAMP) !== 'f2,
-      "f0.cast(TIMESTAMP) !== f2",
-      "CAST(f0 AS TIMESTAMP) <> f2",
-      "true")
-
-    testAllApis(
-      'f0.cast(Types.TIMESTAMP) === 'f6,
-      "f0.cast(TIMESTAMP) === f6",
-      "CAST(f0 AS TIMESTAMP) = f6",
-      "true")
-  }
-
-  // 
----------------------------------------------------------------------------------------------
-
-  def testData = {
-    val testData = new Row(9)
-    testData.setField(0, Date.valueOf("1990-10-14"))
-    testData.setField(1, Time.valueOf("10:20:45"))
-    testData.setField(2, Timestamp.valueOf("1990-10-14 10:20:45.123"))
-    testData.setField(3, Date.valueOf("1990-10-13"))
-    testData.setField(4, Date.valueOf("1990-10-15"))
-    testData.setField(5, Time.valueOf("00:00:00"))
-    testData.setField(6, Timestamp.valueOf("1990-10-14 00:00:00.0"))
-    testData.setField(7, 12000)
-    testData.setField(8, 1467012213000L)
-    testData
-  }
-
-  def typeInfo = {
-    new RowTypeInfo(Seq(
-      Types.DATE,
-      Types.TIME,
-      Types.TIMESTAMP,
-      Types.DATE,
-      Types.DATE,
-      Types.TIME,
-      Types.TIMESTAMP,
-      Types.INT,
-      Types.LONG)).asInstanceOf[TypeInformation[Any]]
-  }
-}

Reply via email to