Repository: flink Updated Branches: refs/heads/master f0992aa13 -> f170e04ab
[FLINK-4359] [table] Add INTERVAL type This closes #2348. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f170e04a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f170e04a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f170e04a Branch: refs/heads/master Commit: f170e04ab93fba75c517b0f9184587d7b306113e Parents: f0992aa Author: twalthr <twal...@apache.org> Authored: Mon Aug 8 10:15:49 2016 +0200 Committer: twalthr <twal...@apache.org> Committed: Thu Aug 11 14:39:24 2016 +0200 ---------------------------------------------------------------------- docs/apis/table.md | 42 +- .../flink/api/scala/table/expressionDsl.scala | 59 ++- .../flink/api/table/FlinkTypeFactory.scala | 39 +- .../org/apache/flink/api/table/Types.scala | 3 + .../flink/api/table/codegen/CodeGenUtils.scala | 46 ++- .../flink/api/table/codegen/CodeGenerator.scala | 50 ++- .../table/codegen/calls/ScalarOperators.scala | 113 +++++- .../table/expressions/ExpressionParser.scala | 62 ++- .../api/table/expressions/ExpressionUtils.scala | 40 ++ .../api/table/expressions/arithmetic.scala | 48 ++- .../flink/api/table/expressions/cast.scala | 3 +- .../flink/api/table/expressions/literals.scala | 20 + .../api/table/typeutils/IntervalTypeInfo.scala | 109 ++++++ .../api/table/typeutils/TypeCheckUtils.scala | 10 +- .../api/table/typeutils/TypeCoercion.scala | 8 + .../api/java/batch/table/ExpressionsITCase.java | 5 +- .../scala/expression/TemporalTypesTest.scala | 389 +++++++++++++++++++ .../api/scala/expression/TimeTypesTest.scala | 231 ----------- 18 files changed, 947 insertions(+), 330 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/docs/apis/table.md ---------------------------------------------------------------------- diff --git a/docs/apis/table.md b/docs/apis/table.md index 57252d9..ea8e343 100644 --- a/docs/apis/table.md +++ b/docs/apis/table.md @@ -957,11 +957,13 @@ unary = [ "!" | "-" ] , composite ; composite = suffixed | atom ; -suffixed = cast | as | aggregation | nullCheck | if | functionCall ; +suffixed = interval | cast | as | aggregation | nullCheck | if | functionCall ; + +interval = composite , "." , ("year" | "month" | "day" | "hour" | "minute" | "second" | "milli") ; cast = composite , ".cast(" , dataType , ")" ; -dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "DATE" | "TIME" | "TIMESTAMP"; +dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "DATE" | "TIME" | "TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" ; as = composite , ".as(" , fieldReference , ")" ; @@ -986,6 +988,8 @@ If working with exact numeric values or large decimals is required, the Table AP In order to work with temporal values the Table API supports Java SQL's Date, Time, and Timestamp types. In the Scala Table API literals can be defined by using `java.sql.Date.valueOf("2016-06-27")`, `java.sql.Time.valueOf("10:10:42")`, or `java.sql.Timestamp.valueOf("2016-06-27 10:10:42.123")`. The Java and Scala Table API also support calling `"2016-06-27".toDate()`, `"10:10:42".toTime()`, and `"2016-06-27 10:10:42.123".toTimestamp()` for converting Strings into temporal types. *Note:* Since Java's temporal SQL types are time zone dependent, please make sure that the Flink Client and all TaskManagers use the same time zone. +Temporal intervals can be represented as number of months (`Types.INTERVAL_MONTHS`) or number of milliseconds (`Types.INTERVAL_MILLIS`). Intervals of same type can be added or subtracted (e.g. `2.hour + 10.minutes`). Intervals of milliseconds can be added to time points (e.g. `"2016-08-10".toDate + 5.day`). + {% top %} @@ -1037,8 +1041,8 @@ The current version supports selection (filter), projection, inner equi-joins, g Among others, the following SQL features are not supported, yet: -- Time interval data type (`INTERVAL`) -- Timestamps are limited to milliseconds precision +- Timestamps and intervals are limited to milliseconds precision +- Interval arithmetic is currenly limited - Distinct aggregates (e.g., `COUNT(DISTINCT name)`) - Non-equi joins and Cartesian products - Grouping sets @@ -1171,20 +1175,22 @@ Data Types The Table API is built on top of Flink's DataSet and DataStream API. Internally, it also uses Flink's `TypeInformation` to distinguish between types. The Table API does not support all Flink types so far. All supported simple types are listed in `org.apache.flink.api.table.Types`. The following table summarizes the relation between Table API types, SQL types, and the resulting Java class. -| Table API | SQL | Java type | -| :--------------------- | :-------------- | :--------------------- | -| `Types.STRING` | `VARCHAR` | `java.lang.String` | -| `Types.BOOLEAN` | `BOOLEAN` | `java.lang.Boolean` | -| `Types.BYTE` | `TINYINT` | `java.lang.Byte` | -| `Types.SHORT` | `SMALLINT` | `java.lang.Short` | -| `Types.INT` | `INTEGER, INT` | `java.lang.Integer` | -| `Types.LONG` | `BIGINT` | `java.lang.Long` | -| `Types.FLOAT` | `REAL, FLOAT` | `java.lang.Float` | -| `Types.DOUBLE` | `DOUBLE` | `java.lang.Double` | -| `Types.DECIMAL` | `DECIMAL` | `java.math.BigDecimal` | -| `Types.DATE` | `DATE` | `java.sql.Date` | -| `Types.TIME` | `TIME` | `java.sql.Time` | -| `Types.TIMESTAMP` | `TIMESTAMP` | `java.sql.Timestamp` | +| Table API | SQL | Java type | +| :--------------------- | :-------------------------- | :--------------------- | +| `Types.STRING` | `VARCHAR` | `java.lang.String` | +| `Types.BOOLEAN` | `BOOLEAN` | `java.lang.Boolean` | +| `Types.BYTE` | `TINYINT` | `java.lang.Byte` | +| `Types.SHORT` | `SMALLINT` | `java.lang.Short` | +| `Types.INT` | `INTEGER, INT` | `java.lang.Integer` | +| `Types.LONG` | `BIGINT` | `java.lang.Long` | +| `Types.FLOAT` | `REAL, FLOAT` | `java.lang.Float` | +| `Types.DOUBLE` | `DOUBLE` | `java.lang.Double` | +| `Types.DECIMAL` | `DECIMAL` | `java.math.BigDecimal` | +| `Types.DATE` | `DATE` | `java.sql.Date` | +| `Types.TIME` | `TIME` | `java.sql.Time` | +| `Types.TIMESTAMP` | `TIMESTAMP(3)` | `java.sql.Timestamp` | +| `Types.INTERVAL_MONTHS`| `INTERVAL YEAR TO MONTH` | `java.lang.Integer` | +| `Types.INTERVAL_MILLIS`| `INTERVAL DAY TO SECOND(3)` | `java.lang.Long` | Advanced types such as generic types, composite types (e.g. POJOs or Tuples), and arrays can be fields of a row but can not be accessed yet. They are treated like a black box within Table API and SQL. http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala index cb91066..9ca9c8a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala @@ -17,13 +17,15 @@ */ package org.apache.flink.api.scala.table -import java.sql.{Timestamp, Time, Date} - -import scala.language.implicitConversions +import java.sql.{Date, Time, Timestamp} +import org.apache.calcite.avatica.util.DateTimeUtils._ import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.table.expressions.ExpressionUtils.{toMilliInterval, toMonthInterval} import org.apache.flink.api.table.expressions._ +import scala.language.implicitConversions + /** * These are all the operations that can be used to construct an [[Expression]] AST for expression * operations. @@ -227,6 +229,57 @@ trait ImplicitExpressionOperations { * Parses a timestamp String in the form "yy-mm-dd hh:mm:ss.fff" to a SQL Timestamp. */ def toTimestamp = Cast(expr, SqlTimeTypeInfo.TIMESTAMP) + + // interval types + + /** + * Creates an interval of the given number of years. + * + * @return interval of months + */ + def year = toMonthInterval(expr, 12) + + /** + * Creates an interval of the given number of months. + * + * @return interval of months + */ + def month = toMonthInterval(expr, 1) + + /** + * Creates an interval of the given number of days. + * + * @return interval of milliseconds + */ + def day = toMilliInterval(expr, MILLIS_PER_DAY) + + /** + * Creates an interval of the given number of hours. + * + * @return interval of milliseconds + */ + def hour = toMilliInterval(expr, MILLIS_PER_HOUR) + + /** + * Creates an interval of the given number of minutes. + * + * @return interval of milliseconds + */ + def minute = toMilliInterval(expr, MILLIS_PER_MINUTE) + + /** + * Creates an interval of the given number of seconds. + * + * @return interval of milliseconds + */ + def second = toMilliInterval(expr, MILLIS_PER_SECOND) + + /** + * Creates an interval of the given number of milliseconds. + * + * @return interval of milliseconds + */ + def milli = toMilliInterval(expr, 1) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala index 6a31487..5a116db 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala @@ -18,15 +18,19 @@ package org.apache.flink.api.table +import org.apache.calcite.avatica.util.TimeUnit import org.apache.calcite.jdbc.JavaTypeFactoryImpl import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem} +import org.apache.calcite.sql.SqlIntervalQualifier import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.sql.`type`.SqlTypeName._ +import org.apache.calcite.sql.parser.SqlParserPos import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.ValueTypeInfo._ import org.apache.flink.api.table.FlinkTypeFactory.typeInfoToSqlTypeName import org.apache.flink.api.table.plan.schema.GenericRelDataType +import org.apache.flink.api.table.typeutils.IntervalTypeInfo import org.apache.flink.api.table.typeutils.TypeCheckUtils.isSimple import scala.collection.mutable @@ -42,7 +46,20 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType = { // simple type can be converted to SQL types and vice versa if (isSimple(typeInfo)) { - createSqlType(typeInfoToSqlTypeName(typeInfo)) + val sqlType = typeInfoToSqlTypeName(typeInfo) + sqlType match { + + case INTERVAL_YEAR_MONTH => + createSqlIntervalType( + new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO)) + + case INTERVAL_DAY_TIME => + createSqlIntervalType( + new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO)) + + case _ => + createSqlType(sqlType) + } } // advanced types require specific RelDataType // for storing the original TypeInformation @@ -58,7 +75,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem]) case ti@_ => - throw new TableException(s"Unsupported type information: $ti") + throw TableException(s"Unsupported type information: $ti") } } @@ -75,16 +92,18 @@ object FlinkTypeFactory { case STRING_TYPE_INFO => VARCHAR case BIG_DEC_TYPE_INFO => DECIMAL - // date/time types + // temporal types case SqlTimeTypeInfo.DATE => DATE case SqlTimeTypeInfo.TIME => TIME case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP + case IntervalTypeInfo.INTERVAL_MONTHS => INTERVAL_YEAR_MONTH + case IntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_TIME case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO => - throw new TableException("Character type is not supported.") + throw TableException("Character type is not supported.") case _@t => - throw new TableException(s"Type is not supported: $t") + throw TableException(s"Type is not supported: $t") } def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match { @@ -98,15 +117,15 @@ object FlinkTypeFactory { case VARCHAR | CHAR => STRING_TYPE_INFO case DECIMAL => BIG_DEC_TYPE_INFO - // date/time types + // temporal types case DATE => SqlTimeTypeInfo.DATE case TIME => SqlTimeTypeInfo.TIME case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP - case INTERVAL_DAY_TIME | INTERVAL_YEAR_MONTH => - throw new TableException("Intervals are not supported yet.") + case INTERVAL_YEAR_MONTH => IntervalTypeInfo.INTERVAL_MONTHS + case INTERVAL_DAY_TIME => IntervalTypeInfo.INTERVAL_MILLIS case NULL => - throw new TableException("Type NULL is not supported. " + + throw TableException("Type NULL is not supported. " + "Null values must have a supported type.") // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING @@ -119,6 +138,6 @@ object FlinkTypeFactory { genericRelDataType.typeInfo case _@t => - throw new TableException(s"Type is not supported: $t") + throw TableException(s"Type is not supported: $t") } } http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala index d63683e..64d4612 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Types.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.table import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} +import org.apache.flink.api.table.typeutils.IntervalTypeInfo /** * This class enumerates all supported types of the Table API. @@ -38,5 +39,7 @@ object Types { val DATE = SqlTimeTypeInfo.DATE val TIME = SqlTimeTypeInfo.TIME val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP + val INTERVAL_MONTHS = IntervalTypeInfo.INTERVAL_MONTHS + val INTERVAL_MILLIS = IntervalTypeInfo.INTERVAL_MILLIS } http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala index c0f4fc8..18bf49f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala @@ -28,7 +28,7 @@ import org.apache.flink.api.common.typeinfo.{FractionalTypeInfo, SqlTimeTypeInfo import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo, TypeExtractor} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeCheckUtils} +import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, RowTypeInfo, TypeCheckUtils} object CodeGenUtils { @@ -65,11 +65,15 @@ object CodeGenUtils { case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]" case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]" - // internal primitive representation of Date/Time/Timestamp + // internal primitive representation of time points case SqlTimeTypeInfo.DATE => "int" case SqlTimeTypeInfo.TIME => "int" case SqlTimeTypeInfo.TIMESTAMP => "long" + // internal primitive representation of time intervals + case IntervalTypeInfo.INTERVAL_MONTHS => "int" + case IntervalTypeInfo.INTERVAL_MILLIS => "long" + case _ => tpe.getTypeClass.getCanonicalName } @@ -100,7 +104,10 @@ object CodeGenUtils { case BOOLEAN_TYPE_INFO => "false" case STRING_TYPE_INFO => "\"\"" case CHAR_TYPE_INFO => "'\\0'" - case SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIME | SqlTimeTypeInfo.TIMESTAMP => "-1" + case SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIME => "-1" + case SqlTimeTypeInfo.TIMESTAMP => "-1L" + case IntervalTypeInfo.INTERVAL_MONTHS => "-1" + case IntervalTypeInfo.INTERVAL_MILLIS => "-1L" case _ => "null" } @@ -113,7 +120,10 @@ object CodeGenUtils { def qualifyMethod(method: Method): String = method.getDeclaringClass.getCanonicalName + "." + method.getName - def internalToTemporalCode(resultType: TypeInformation[_], resultTerm: String) = + def qualifyEnum(enum: Enum[_]): String = + enum.getClass.getCanonicalName + "." + enum.name() + + def internalToTimePointCode(resultType: TypeInformation[_], resultTerm: String) = resultType match { case SqlTimeTypeInfo.DATE => s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_DATE.method)}($resultTerm)" @@ -123,7 +133,7 @@ object CodeGenUtils { s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_TIMESTAMP.method)}($resultTerm)" } - def temporalToInternalCode(resultType: TypeInformation[_], resultTerm: String) = + def timePointToInternalCode(resultType: TypeInformation[_], resultTerm: String) = resultType match { case SqlTimeTypeInfo.DATE => s"${qualifyMethod(BuiltInMethod.DATE_TO_INT.method)}($resultTerm)" @@ -146,15 +156,25 @@ object CodeGenUtils { throw new CodeGenException(s"Comparable type expected, but was '${genExpr.resultType}'.") } - def requireString(genExpr: GeneratedExpression) = genExpr.resultType match { - case STRING_TYPE_INFO => // ok - case _ => throw new CodeGenException("String expression type expected.") - } + def requireString(genExpr: GeneratedExpression) = + if (!TypeCheckUtils.isString(genExpr.resultType)) { + throw new CodeGenException("String expression type expected.") + } - def requireBoolean(genExpr: GeneratedExpression) = genExpr.resultType match { - case BOOLEAN_TYPE_INFO => // ok - case _ => throw new CodeGenException("Boolean expression type expected.") - } + def requireBoolean(genExpr: GeneratedExpression) = + if (!TypeCheckUtils.isBoolean(genExpr.resultType)) { + throw new CodeGenException("Boolean expression type expected.") + } + + def requireTemporal(genExpr: GeneratedExpression) = + if (!TypeCheckUtils.isTemporal(genExpr.resultType)) { + throw new CodeGenException("Temporal expression type expected.") + } + + def requireTimeInterval(genExpr: GeneratedExpression) = + if (!TypeCheckUtils.isTimeInterval(genExpr.resultType)) { + throw new CodeGenException("Interval expression type expected.") + } // ---------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala index c1508d8..92f9761 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala @@ -29,13 +29,13 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeIn import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig} import org.apache.flink.api.table.codegen.CodeGenUtils._ import org.apache.flink.api.table.codegen.Indenter.toISC import org.apache.flink.api.table.codegen.calls.ScalarFunctions import org.apache.flink.api.table.codegen.calls.ScalarOperators._ import org.apache.flink.api.table.typeutils.RowTypeInfo -import org.apache.flink.api.table.typeutils.TypeCheckUtils.{isNumeric, isString, isTemporal} +import org.apache.flink.api.table.typeutils.TypeCheckUtils._ +import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig} import scala.collection.JavaConversions._ import scala.collection.mutable @@ -618,6 +618,20 @@ class CodeGenerator( generateNonNullLiteral(resultType, value.toString) case TIMESTAMP => generateNonNullLiteral(resultType, value.toString + "L") + case INTERVAL_YEAR_MONTH => + val decimal = BigDecimal(value.asInstanceOf[JBigDecimal]) + if (decimal.isValidInt) { + generateNonNullLiteral(resultType, decimal.intValue().toString) + } else { + throw new CodeGenException("Decimal can not be converted to interval of months.") + } + case INTERVAL_DAY_TIME => + val decimal = BigDecimal(value.asInstanceOf[JBigDecimal]) + if (decimal.isValidLong) { + generateNonNullLiteral(resultType, decimal.longValue().toString + "L") + } else { + throw new CodeGenException("Decimal can not be converted to interval of milliseconds.") + } case t@_ => throw new CodeGenException(s"Type not supported: $t") @@ -645,11 +659,12 @@ class CodeGenerator( requireNumeric(right) generateArithmeticOperator("+", nullCheck, resultType, left, right) - case PLUS if isString(resultType) => + case PLUS | DATETIME_PLUS if isTemporal(resultType) => val left = operands.head val right = operands(1) - requireString(left) - generateStringConcatOperator(nullCheck, left, right) + requireTemporal(left) + requireTemporal(right) + generateTemporalPlusMinus(plus = true, nullCheck, left, right) case MINUS if isNumeric(resultType) => val left = operands.head @@ -658,6 +673,13 @@ class CodeGenerator( requireNumeric(right) generateArithmeticOperator("-", nullCheck, resultType, left, right) + case MINUS if isTemporal(resultType) => + val left = operands.head + val right = operands(1) + requireTemporal(left) + requireTemporal(right) + generateTemporalPlusMinus(plus = false, nullCheck, left, right) + case MULTIPLY if isNumeric(resultType) => val left = operands.head val right = operands(1) @@ -684,11 +706,21 @@ class CodeGenerator( requireNumeric(operand) generateUnaryArithmeticOperator("-", nullCheck, resultType, operand) + case UNARY_MINUS if isTimeInterval(resultType) => + val operand = operands.head + requireTimeInterval(operand) + generateUnaryIntervalPlusMinus(plus = false, nullCheck, operand) + case UNARY_PLUS if isNumeric(resultType) => val operand = operands.head requireNumeric(operand) generateUnaryArithmeticOperator("+", nullCheck, resultType, operand) + case UNARY_PLUS if isTimeInterval(resultType) => + val operand = operands.head + requireTimeInterval(operand) + generateUnaryIntervalPlusMinus(plus = true, nullCheck, operand) + // comparison case EQUALS => val left = operands.head @@ -760,7 +792,7 @@ class CodeGenerator( generateIfElse(nullCheck, operands, resultType) // casting - case CAST => + case CAST | REINTERPRET => val operand = operands.head generateCast(nullCheck, operand, resultType) @@ -946,8 +978,8 @@ class CodeGenerator( val defaultValue = primitiveDefaultValue(literalType) // explicit unboxing - val unboxedLiteralCode = if (isTemporal(literalType)) { - temporalToInternalCode(literalType, literalCode) + val unboxedLiteralCode = if (isTimePoint(literalType)) { + timePointToInternalCode(literalType, literalCode) } else { literalCode } @@ -1023,7 +1055,7 @@ class CodeGenerator( case SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIME | SqlTimeTypeInfo.TIMESTAMP => val resultTerm = newName("result") val resultTypeTerm = boxedTypeTermForTypeInfo(expr.resultType) - val convMethod = internalToTemporalCode(expr.resultType, expr.resultTerm) + val convMethod = internalToTimePointCode(expr.resultType, expr.resultTerm) val resultCode = if (nullCheck) { s""" http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala index 28c66b6..81187a5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala @@ -17,12 +17,14 @@ */ package org.apache.flink.api.table.codegen.calls -import org.apache.calcite.avatica.util.DateTimeUtils +import org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY +import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange} import org.apache.calcite.util.BuiltInMethod import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, SqlTimeTypeInfo, TypeInformation} import org.apache.flink.api.table.codegen.CodeGenUtils._ import org.apache.flink.api.table.codegen.{CodeGenException, GeneratedExpression} +import org.apache.flink.api.table.typeutils.IntervalTypeInfo import org.apache.flink.api.table.typeutils.TypeCheckUtils._ object ScalarOperators { @@ -70,10 +72,8 @@ object ScalarOperators { s"$operandTerm.negate()" } else if (isDecimal(operand.resultType) && operator == "+") { s"$operandTerm" - } else if (isNumeric(operand.resultType)) { + } else { s"$operator($operandTerm)" - } else { - throw new CodeGenException("Unsupported unary operator.") } } } @@ -396,7 +396,23 @@ object ScalarOperators { // Date/Time/Timestamp -> String case (dtt: SqlTimeTypeInfo[_], STRING_TYPE_INFO) => generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) { - (operandTerm) => s"""${internalToTemporalCode(dtt, operandTerm)}.toString()""" + (operandTerm) => s"""${internalToTimePointCode(dtt, operandTerm)}.toString()""" + } + + // Interval Months -> String + case (IntervalTypeInfo.INTERVAL_MONTHS, STRING_TYPE_INFO) => + val method = qualifyMethod(BuiltInMethod.INTERVAL_YEAR_MONTH_TO_STRING.method) + val timeUnitRange = qualifyEnum(TimeUnitRange.YEAR_TO_MONTH) + generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) { + (operandTerm) => s"$method($operandTerm, $timeUnitRange)" + } + + // Interval Millis -> String + case (IntervalTypeInfo.INTERVAL_MILLIS, STRING_TYPE_INFO) => + val method = qualifyMethod(BuiltInMethod.INTERVAL_DAY_TIME_TO_STRING.method) + val timeUnitRange = qualifyEnum(TimeUnitRange.DAY_TO_SECOND) + generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) { + (operandTerm) => s"$method($operandTerm, $timeUnitRange, 3)" // milli second precision } // * (not Date/Time/Timestamp) -> String @@ -508,25 +524,28 @@ object ScalarOperators { s"${classOf[DateTimeUtils].getCanonicalName}.MILLIS_PER_DAY)" } - // Date -> Integer, Time -> Integer - case (SqlTimeTypeInfo.DATE, INT_TYPE_INFO) | (SqlTimeTypeInfo.TIME, INT_TYPE_INFO) => - internalExprCasting(operand, INT_TYPE_INFO) - + // internal temporal casting + // Date -> Integer + // Time -> Integer // Timestamp -> Long - case (SqlTimeTypeInfo.TIMESTAMP, LONG_TYPE_INFO) => - internalExprCasting(operand, LONG_TYPE_INFO) - // Integer -> Date - case (INT_TYPE_INFO, SqlTimeTypeInfo.DATE) => - internalExprCasting(operand, SqlTimeTypeInfo.DATE) - // Integer -> Time - case (INT_TYPE_INFO, SqlTimeTypeInfo.TIME) => - internalExprCasting(operand, SqlTimeTypeInfo.TIME) - // Long -> Timestamp - case (LONG_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) => - internalExprCasting(operand, SqlTimeTypeInfo.TIMESTAMP) + // Integer -> Interval Months + // Long -> Interval Millis + // Interval Months -> Integer + // Interval Millis -> Long + case (SqlTimeTypeInfo.DATE, INT_TYPE_INFO) | + (SqlTimeTypeInfo.TIME, INT_TYPE_INFO) | + (SqlTimeTypeInfo.TIMESTAMP, LONG_TYPE_INFO) | + (INT_TYPE_INFO, SqlTimeTypeInfo.DATE) | + (INT_TYPE_INFO, SqlTimeTypeInfo.TIME) | + (LONG_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) | + (INT_TYPE_INFO, IntervalTypeInfo.INTERVAL_MONTHS) | + (LONG_TYPE_INFO, IntervalTypeInfo.INTERVAL_MILLIS) | + (IntervalTypeInfo.INTERVAL_MONTHS, INT_TYPE_INFO) | + (IntervalTypeInfo.INTERVAL_MONTHS, LONG_TYPE_INFO) => + internalExprCasting(operand, targetType) case (from, to) => throw new CodeGenException(s"Unsupported cast from '$from' to '$to'.") @@ -591,6 +610,60 @@ object ScalarOperators { } } + def generateTemporalPlusMinus( + plus: Boolean, + nullCheck: Boolean, + left: GeneratedExpression, + right: GeneratedExpression) + : GeneratedExpression = { + + val operator = if (plus) "+" else "-" + + (left.resultType, right.resultType) match { + case (l: IntervalTypeInfo[_], r: IntervalTypeInfo[_]) if l == r => + generateArithmeticOperator(operator, nullCheck, l, left, right) + + case (SqlTimeTypeInfo.DATE, IntervalTypeInfo.INTERVAL_MILLIS) | + (IntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.DATE) => + generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, right) { + if (isTimePoint(left.resultType)) { + (leftTerm, rightTerm) => s"$leftTerm + ((int) ($rightTerm / ${MILLIS_PER_DAY}L))" + } else { + (leftTerm, rightTerm) => s"((int) ($leftTerm / ${MILLIS_PER_DAY}L)) + $rightTerm" + } + } + + case (SqlTimeTypeInfo.TIME, IntervalTypeInfo.INTERVAL_MILLIS) | + (IntervalTypeInfo.INTERVAL_MILLIS, SqlTimeTypeInfo.TIME) => + generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIME, left, right) { + if (isTimePoint(left.resultType)) { + (leftTerm, rightTerm) => s"$leftTerm + ((int) ($rightTerm))" + } else { + (leftTerm, rightTerm) => s"((int) ($leftTerm)) + $rightTerm" + } + } + + case (SqlTimeTypeInfo.TIMESTAMP, IntervalTypeInfo.INTERVAL_MILLIS) => + generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIMESTAMP, left, right) { + (leftTerm, rightTerm) => s"$leftTerm + $rightTerm" + } + + // TODO more operations when CALCITE-308 is fixed + + case _ => + throw new CodeGenException("Unsupported temporal arithmetic.") + } + } + + def generateUnaryIntervalPlusMinus( + plus: Boolean, + nullCheck: Boolean, + operand: GeneratedExpression) + : GeneratedExpression = { + val operator = if (plus) "+" else "-" + generateUnaryArithmeticOperator(operator, nullCheck, operand.resultType, operand) + } + // ---------------------------------------------------------------------------------------------- private def generateUnaryOperatorIfNotNull( http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala index 6aa4f89..41222c8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala @@ -17,8 +17,11 @@ */ package org.apache.flink.api.table.expressions -import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation, BasicTypeInfo} +import org.apache.calcite.avatica.util.DateTimeUtils.{MILLIS_PER_DAY, MILLIS_PER_HOUR, MILLIS_PER_MINUTE, MILLIS_PER_SECOND} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation} import org.apache.flink.api.table.ExpressionParserException +import org.apache.flink.api.table.expressions.ExpressionUtils.{toMilliInterval, toMonthInterval} +import org.apache.flink.api.table.typeutils.IntervalTypeInfo import scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers} @@ -57,6 +60,13 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val TO_DATE: Keyword = Keyword("toDate") lazy val TO_TIME: Keyword = Keyword("toTime") lazy val TO_TIMESTAMP: Keyword = Keyword("toTimestamp") + lazy val YEAR: Keyword = Keyword("year") + lazy val MONTH: Keyword = Keyword("month") + lazy val DAY: Keyword = Keyword("day") + lazy val HOUR: Keyword = Keyword("hour") + lazy val MINUTE: Keyword = Keyword("minute") + lazy val SECOND: Keyword = Keyword("second") + lazy val MILLI: Keyword = Keyword("milli") def functionIdent: ExpressionParser.Parser[String] = not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~ @@ -68,6 +78,12 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val dataType: PackratParser[TypeInformation[_]] = "BYTE" ^^ { ti => BasicTypeInfo.BYTE_TYPE_INFO } | "SHORT" ^^ { ti => BasicTypeInfo.SHORT_TYPE_INFO } | + "INTERVAL_MONTHS" ^^ { + ti => IntervalTypeInfo.INTERVAL_MONTHS.asInstanceOf[TypeInformation[_]] + } | + "INTERVAL_MILLIS" ^^ { + ti => IntervalTypeInfo.INTERVAL_MILLIS.asInstanceOf[TypeInformation[_]] + } | "INT" ^^ { ti => BasicTypeInfo.INT_TYPE_INFO } | "LONG" ^^ { ti => BasicTypeInfo.LONG_TYPE_INFO } | "FLOAT" ^^ { ti => BasicTypeInfo.FLOAT_TYPE_INFO } | @@ -81,10 +97,14 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { // Literals + // same as floatingPointNumber but we do not allow trailing dot "12.d" or "2." + lazy val floatingPointNumberFlink: Parser[String] = + """-?(\d+(\.\d+)?|\d*\.\d+)([eE][+-]?\d+)?[fFdD]?""".r + lazy val numberLiteral: PackratParser[Expression] = (wholeNumber <~ ("l" | "L")) ^^ { n => Literal(n.toLong) } | (decimalNumber <~ ("p" | "P")) ^^ { n => Literal(BigDecimal(n)) } | - (floatingPointNumber | decimalNumber) ^^ { + (floatingPointNumberFlink | decimalNumber) ^^ { n => if (n.matches("""-?\d+""")) { Literal(n.toInt) @@ -109,7 +129,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { } lazy val nullLiteral: PackratParser[Expression] = NULL ~ "(" ~> dataType <~ ")" ^^ { - case dt => Null(dt) + dt => Null(dt) } lazy val literalExpr: PackratParser[Expression] = @@ -169,7 +189,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { } lazy val suffixTrimWithoutArgs = composite <~ ".trim" ~ opt("()") ^^ { - case e => + e => Trim(TrimConstants.TRIM_BOTH, TrimConstants.TRIM_DEFAULT_CHAR, e) } @@ -198,10 +218,29 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val suffixToTime: PackratParser[Expression] = composite <~ "." ~ TO_TIME ~ opt("()") ^^ { e => Cast(e, SqlTimeTypeInfo.TIME) } + lazy val suffixTimeInterval : PackratParser[Expression] = + composite ~ "." ~ (YEAR | MONTH | DAY | HOUR | MINUTE | SECOND | MILLI) ^^ { + + case expr ~ _ ~ YEAR.key => toMonthInterval(expr, 12) + + case expr ~ _ ~ MONTH.key => toMonthInterval(expr, 1) + + case expr ~ _ ~ DAY.key => toMilliInterval(expr, MILLIS_PER_DAY) + + case expr ~ _ ~ HOUR.key => toMilliInterval(expr, MILLIS_PER_HOUR) + + case expr ~ _ ~ MINUTE.key => toMilliInterval(expr, MILLIS_PER_MINUTE) + + case expr ~ _ ~ SECOND.key => toMilliInterval(expr, MILLIS_PER_SECOND) + + case expr ~ _ ~ MILLI.key => toMilliInterval(expr, 1) + } + lazy val suffixed: PackratParser[Expression] = - suffixIsNull | suffixIsNotNull | suffixSum | suffixMin | suffixMax | suffixCount | suffixAvg | - suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs | suffixIf | suffixFunctionCall | - suffixAsc | suffixDesc | suffixToDate | suffixToTimestamp | suffixToTime + suffixTimeInterval | suffixIsNull | suffixIsNotNull | suffixSum | suffixMin | suffixMax | + suffixCount | suffixAvg | suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs | + suffixIf | suffixAsc | suffixDesc | suffixToDate | suffixToTimestamp | suffixToTime | + suffixFunctionCall // function call must always be at the end // prefix operators @@ -263,7 +302,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val prefixed: PackratParser[Expression] = prefixIsNull | prefixIsNotNull | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg | - prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf | prefixFunctionCall + prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf | + prefixFunctionCall // function call must always be at the end // suffix/prefix composite @@ -339,10 +379,10 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { parseAll(expressionList, expression) match { case Success(lst, _) => lst - case Failure(msg, _) => throw new ExpressionParserException( + case Failure(msg, _) => throw ExpressionParserException( "Could not parse expression: " + msg) - case Error(msg, _) => throw new ExpressionParserException( + case Error(msg, _) => throw ExpressionParserException( "Could not parse expression: " + msg) } } @@ -352,7 +392,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { case Success(lst, _) => lst case fail => - throw new ExpressionParserException("Could not parse expression: " + fail.toString) + throw ExpressionParserException("Could not parse expression: " + fail.toString) } } } http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionUtils.scala new file mode 100644 index 0000000..89671de --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionUtils.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.table.typeutils.IntervalTypeInfo + +object ExpressionUtils { + + private[flink] def toMonthInterval(expr: Expression, multiplier: Int): Expression = expr match { + case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) => + Literal(value * multiplier, IntervalTypeInfo.INTERVAL_MONTHS) + case _ => + Cast(Mul(expr, Literal(multiplier)), IntervalTypeInfo.INTERVAL_MONTHS) + } + + private[flink] def toMilliInterval(expr: Expression, multiplier: Long): Expression = expr match { + case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) => + Literal(value * multiplier, IntervalTypeInfo.INTERVAL_MILLIS) + case _ => + Cast(Mul(expr, Literal(multiplier)), IntervalTypeInfo.INTERVAL_MILLIS) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala index e4ed08e..b301f22 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala @@ -22,8 +22,8 @@ import org.apache.calcite.sql.SqlOperator import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.api.table.typeutils.TypeCheckUtils.{isNumeric, isString} -import org.apache.flink.api.table.typeutils.{TypeCheckUtils, TypeCoercion} +import org.apache.flink.api.table.typeutils.TypeCheckUtils._ +import org.apache.flink.api.table.typeutils.TypeCoercion import org.apache.flink.api.table.validate._ import scala.collection.JavaConversions._ @@ -61,10 +61,14 @@ case class Plus(left: Expression, right: Expression) extends BinaryArithmetic { override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { if(isString(left.resultType)) { val castedRight = Cast(right, BasicTypeInfo.STRING_TYPE_INFO) - relBuilder.call(SqlStdOperatorTable.PLUS, left.toRexNode, castedRight.toRexNode) + relBuilder.call(SqlStdOperatorTable.CONCAT, left.toRexNode, castedRight.toRexNode) } else if(isString(right.resultType)) { val castedLeft = Cast(left, BasicTypeInfo.STRING_TYPE_INFO) - relBuilder.call(SqlStdOperatorTable.PLUS, castedLeft.toRexNode, right.toRexNode) + relBuilder.call(SqlStdOperatorTable.CONCAT, castedLeft.toRexNode, right.toRexNode) + } else if (isTimeInterval(left.resultType) && left.resultType == right.resultType) { + relBuilder.call(SqlStdOperatorTable.PLUS, left.toRexNode, right.toRexNode) + } else if (isTemporal(left.resultType) && isTemporal(right.resultType)) { + relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, left.toRexNode, right.toRexNode) } else { val castedLeft = Cast(left, resultType) val castedRight = Cast(right, resultType) @@ -72,15 +76,22 @@ case class Plus(left: Expression, right: Expression) extends BinaryArithmetic { } } - // TODO: tighten this rule once we implemented type coercion rules during validation override private[flink] def validateInput(): ExprValidationResult = { if (isString(left.resultType) || isString(right.resultType)) { ValidationSuccess - } else if (!isNumeric(left.resultType) || !isNumeric(right.resultType)) { - ValidationFailure(s"$this requires Numeric or String input," + - s" get $left : ${left.resultType} and $right : ${right.resultType}") - } else { + } else if (isTimeInterval(left.resultType) && left.resultType == right.resultType) { + ValidationSuccess + } else if (isTimePoint(left.resultType) && isTimeInterval(right.resultType)) { + ValidationSuccess + } else if (isTimeInterval(left.resultType) && isTimePoint(right.resultType)) { + ValidationSuccess + } else if (isNumeric(left.resultType) && isNumeric(right.resultType)) { ValidationSuccess + } else { + ValidationFailure( + s"$this requires Numeric, String, Intervals of same type, " + + s"or Interval and a time point input, " + + s"get $left : ${left.resultType} and $right : ${right.resultType}") } } } @@ -94,14 +105,29 @@ case class UnaryMinus(child: Expression) extends UnaryExpression { override private[flink] def resultType = child.resultType - override private[flink] def validateInput(): ExprValidationResult = - TypeCheckUtils.assertNumericExpr(child.resultType, "unary minus") + override private[flink] def validateInput(): ExprValidationResult = { + if (isNumeric(child.resultType)) { + ValidationSuccess + } else if (isTimeInterval(child.resultType)) { + ValidationSuccess + } else { + ValidationFailure(s"$this requires Numeric, or Interval input, get ${child.resultType}") + } + } } case class Minus(left: Expression, right: Expression) extends BinaryArithmetic { override def toString = s"($left - $right)" private[flink] val sqlOperator = SqlStdOperatorTable.MINUS + + override private[flink] def validateInput(): ExprValidationResult = { + if (isTimeInterval(left.resultType) && left.resultType == right.resultType) { + ValidationSuccess + } else { + super.validateInput() + } + } } case class Div(left: Expression, right: Expression) extends BinaryArithmetic { http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala index 525d010..f65dd5b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala @@ -32,7 +32,8 @@ case class Cast(child: Expression, resultType: TypeInformation[_]) extends Unary val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory] relBuilder .getRexBuilder - .makeCast( + // we use abstract cast here because RelBuilder.cast() has to many side effects + .makeAbstractCast( typeFactory.createTypeFromTypeInfo(resultType), child.toRexNode) } http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala index cd3de60..677160a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala @@ -20,11 +20,15 @@ package org.apache.flink.api.table.expressions import java.sql.{Date, Time, Timestamp} import java.util.{Calendar, TimeZone} +import org.apache.calcite.avatica.util.TimeUnit import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.SqlIntervalQualifier import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.sql.parser.SqlParserPos import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation} import org.apache.flink.api.table.FlinkTypeFactory +import org.apache.flink.api.table.typeutils.IntervalTypeInfo object Literal { private[flink] def apply(l: Any): Literal = l match { @@ -63,6 +67,22 @@ case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpre case SqlTimeTypeInfo.TIMESTAMP => relBuilder.getRexBuilder.makeTimestampLiteral(dateToCalendar, 3) + case IntervalTypeInfo.INTERVAL_MONTHS => + val interval = java.math.BigDecimal.valueOf(value.asInstanceOf[Int]) + val intervalQualifier = new SqlIntervalQualifier( + TimeUnit.YEAR, + TimeUnit.MONTH, + SqlParserPos.ZERO) + relBuilder.getRexBuilder.makeIntervalLiteral(interval, intervalQualifier) + + case IntervalTypeInfo.INTERVAL_MILLIS => + val interval = java.math.BigDecimal.valueOf(value.asInstanceOf[Long]) + val intervalQualifier = new SqlIntervalQualifier( + TimeUnit.DAY, + TimeUnit.SECOND, + SqlParserPos.ZERO) + relBuilder.getRexBuilder.makeIntervalLiteral(interval, intervalQualifier) + case _ => relBuilder.literal(value) } } http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/IntervalTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/IntervalTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/IntervalTypeInfo.scala new file mode 100644 index 0000000..85524fb --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/IntervalTypeInfo.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.typeutils + +import java.util.Objects + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.base.{IntComparator, IntSerializer, LongComparator, LongSerializer} +import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} +import org.apache.flink.api.table.typeutils.IntervalTypeInfo.instantiateComparator +import org.apache.flink.util.Preconditions._ + +/** + * TypeInformation for SQL INTERVAL types. + */ +@SerialVersionUID(-1816179424364825258L) +class IntervalTypeInfo[T]( + val clazz: Class[T], + val serializer: TypeSerializer[T], + val comparatorClass: Class[_ <: TypeComparator[T]]) + extends TypeInformation[T] + with AtomicType[T] { + + checkNotNull(clazz) + checkNotNull(serializer) + checkNotNull(comparatorClass) + + override def isBasicType: Boolean = false + + override def isTupleType: Boolean = false + + override def getArity: Int = 1 + + override def getTotalFields: Int = 1 + + override def getTypeClass: Class[T] = clazz + + override def isKeyType: Boolean = true + + override def createSerializer(config: ExecutionConfig): TypeSerializer[T] = serializer + + override def createComparator( + sortOrderAscending: Boolean, + executionConfig: ExecutionConfig) + : TypeComparator[T] = instantiateComparator(comparatorClass, sortOrderAscending) + + // ---------------------------------------------------------------------------------------------- + + override def hashCode: Int = Objects.hash(clazz, serializer, comparatorClass) + + def canEqual(obj: Any): Boolean = obj.isInstanceOf[IntervalTypeInfo[_]] + + override def equals(obj: Any): Boolean = { + obj match { + case other: IntervalTypeInfo[_] => + other.canEqual(this) && + (this.clazz eq other.clazz) && + serializer == other.serializer && + (this.comparatorClass eq other.comparatorClass) + case _ => + false + } + } + + override def toString: String = s"IntervalTypeInfo(${clazz.getSimpleName})" +} + +object IntervalTypeInfo { + + val INTERVAL_MONTHS = + new IntervalTypeInfo(classOf[java.lang.Integer], IntSerializer.INSTANCE, classOf[IntComparator]) + + val INTERVAL_MILLIS = + new IntervalTypeInfo(classOf[java.lang.Long], LongSerializer.INSTANCE, classOf[LongComparator]) + + // ---------------------------------------------------------------------------------------------- + + private def instantiateComparator[X]( + comparatorClass: Class[_ <: TypeComparator[X]], + ascendingOrder: java.lang.Boolean) + : TypeComparator[X] = { + try { + val constructor = comparatorClass.getConstructor(classOf[java.lang.Boolean]) + constructor.newInstance(ascendingOrder) + } catch { + case e: Exception => + throw new RuntimeException( + s"Could not initialize comparator ${comparatorClass.getName}", e) + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala index c19deec..4ff7888 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCheckUtils.scala @@ -30,6 +30,7 @@ object TypeCheckUtils { def isAdvanced(dataType: TypeInformation[_]): Boolean = dataType match { case _: BasicTypeInfo[_] => false case _: SqlTimeTypeInfo[_] => false + case _: IntervalTypeInfo[_] => false case _ => true } @@ -45,7 +46,14 @@ object TypeCheckUtils { case _ => false } - def isTemporal(dataType: TypeInformation[_]): Boolean = dataType.isInstanceOf[SqlTimeTypeInfo[_]] + def isTemporal(dataType: TypeInformation[_]): Boolean = + isTimePoint(dataType) || isTimeInterval(dataType) + + def isTimePoint(dataType: TypeInformation[_]): Boolean = + dataType.isInstanceOf[SqlTimeTypeInfo[_]] + + def isTimeInterval(dataType: TypeInformation[_]): Boolean = + dataType.isInstanceOf[IntervalTypeInfo[_]] def isString(dataType: TypeInformation[_]): Boolean = dataType == STRING_TYPE_INFO http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala index dad13fb..bb3d060 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeCoercion.scala @@ -45,6 +45,9 @@ object TypeCoercion { case (_, BIG_DEC_TYPE_INFO) => Some(BIG_DEC_TYPE_INFO) case (BIG_DEC_TYPE_INFO, _) => Some(BIG_DEC_TYPE_INFO) + case (stti: SqlTimeTypeInfo[_], _: IntervalTypeInfo[_]) => Some(stti) + case (_: IntervalTypeInfo[_], stti: SqlTimeTypeInfo[_]) => Some(stti) + case tuple if tuple.productIterator.forall(numericWideningPrecedence.contains) => val higherIndex = numericWideningPrecedence.lastIndexWhere(t => t == tp1 || t == tp2) Some(numericWideningPrecedence(higherIndex)) @@ -100,6 +103,8 @@ object TypeCoercion { case (INT_TYPE_INFO, SqlTimeTypeInfo.DATE) => true case (INT_TYPE_INFO, SqlTimeTypeInfo.TIME) => true case (LONG_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) => true + case (INT_TYPE_INFO, IntervalTypeInfo.INTERVAL_MONTHS) => true + case (LONG_TYPE_INFO, IntervalTypeInfo.INTERVAL_MILLIS) => true case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIME) => false case (SqlTimeTypeInfo.TIME, SqlTimeTypeInfo.DATE) => false @@ -108,6 +113,9 @@ object TypeCoercion { case (SqlTimeTypeInfo.TIME, INT_TYPE_INFO) => true case (SqlTimeTypeInfo.TIMESTAMP, LONG_TYPE_INFO) => true + case (IntervalTypeInfo.INTERVAL_MONTHS, INT_TYPE_INFO) => true + case (IntervalTypeInfo.INTERVAL_MILLIS, LONG_TYPE_INFO) => true + case _ => false } } http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java index 7d225f3..b060540 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/ExpressionsITCase.java @@ -192,11 +192,12 @@ public class ExpressionsITCase extends TableProgramsTestBase { "a.cast(STRING) + a.cast(STRING)," + "CAST(ISNULL(b), INT)," + "ISNULL(CAST(b, INT).abs()) === false," + - "((((true) === true) || false).cast(STRING) + 'X ').trim"); + "((((true) === true) || false).cast(STRING) + 'X ').trim," + + "12.isNull"); DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); List<Row> results = ds.collect(); - String expected = "false,10,55,0,true,trueX"; + String expected = "false,10,55,0,true,trueX,false"; compareResultAsText(results, expected); } http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/TemporalTypesTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/TemporalTypesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/TemporalTypesTest.scala new file mode 100644 index 0000000..63d6346 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/TemporalTypesTest.scala @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.expression + +import java.sql.{Date, Time, Timestamp} + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.expressions.utils.ExpressionTestBase +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.apache.flink.api.table.{Row, Types} +import org.junit.Test + +class TemporalTypesTest extends ExpressionTestBase { + + @Test + def testTimePointLiterals(): Unit = { + testAllApis( + "1990-10-14".toDate, + "'1990-10-14'.toDate", + "DATE '1990-10-14'", + "1990-10-14") + + testTableApi( + Date.valueOf("2040-09-11"), + "'2040-09-11'.toDate", + "2040-09-11") + + testAllApis( + "1500-04-30".cast(Types.DATE), + "'1500-04-30'.cast(DATE)", + "CAST('1500-04-30' AS DATE)", + "1500-04-30") + + testAllApis( + "15:45:59".toTime, + "'15:45:59'.toTime", + "TIME '15:45:59'", + "15:45:59") + + testTableApi( + Time.valueOf("00:00:00"), + "'00:00:00'.toTime", + "00:00:00") + + testAllApis( + "1:30:00".cast(Types.TIME), + "'1:30:00'.cast(TIME)", + "CAST('1:30:00' AS TIME)", + "01:30:00") + + testAllApis( + "1990-10-14 23:00:00.123".toTimestamp, + "'1990-10-14 23:00:00.123'.toTimestamp", + "TIMESTAMP '1990-10-14 23:00:00.123'", + "1990-10-14 23:00:00.123") + + testTableApi( + Timestamp.valueOf("2040-09-11 00:00:00.000"), + "'2040-09-11 00:00:00.000'.toTimestamp", + "2040-09-11 00:00:00.0") + + testAllApis( + "1500-04-30 12:00:00".cast(Types.TIMESTAMP), + "'1500-04-30 12:00:00'.cast(TIMESTAMP)", + "CAST('1500-04-30 12:00:00' AS TIMESTAMP)", + "1500-04-30 12:00:00.0") + } + + @Test + def testTimeIntervalLiterals(): Unit = { + testAllApis( + 1.year, + "1.year", + "INTERVAL '1' YEAR", + "+1-00") + + testAllApis( + 1.month, + "1.month", + "INTERVAL '1' MONTH", + "+0-01") + + testAllApis( + 12.day, + "12.day", + "INTERVAL '12' DAY", + "+12 00:00:00.000") + + testAllApis( + 1.hour, + "1.hour", + "INTERVAL '1' HOUR", + "+0 01:00:00.000") + + testAllApis( + 3.minute, + "3.minute", + "INTERVAL '3' MINUTE", + "+0 00:03:00.000") + + testAllApis( + 3.second, + "3.second", + "INTERVAL '3' SECOND", + "+0 00:00:03.000") + + testAllApis( + 3.milli, + "3.milli", + "INTERVAL '0.003' SECOND", + "+0 00:00:00.003") + } + + @Test + def testTimePointInput(): Unit = { + testAllApis( + 'f0, + "f0", + "f0", + "1990-10-14") + + testAllApis( + 'f1, + "f1", + "f1", + "10:20:45") + + testAllApis( + 'f2, + "f2", + "f2", + "1990-10-14 10:20:45.123") + } + + @Test + def testTimeIntervalInput(): Unit = { + testAllApis( + 'f9, + "f9", + "f9", + "+2-00") + + testAllApis( + 'f10, + "f10", + "f10", + "+0 00:00:12.000") + } + + @Test + def testTimePointCasting(): Unit = { + testAllApis( + 'f0.cast(Types.TIMESTAMP), + "f0.cast(TIMESTAMP)", + "CAST(f0 AS TIMESTAMP)", + "1990-10-14 00:00:00.0") + + testAllApis( + 'f1.cast(Types.TIMESTAMP), + "f1.cast(TIMESTAMP)", + "CAST(f1 AS TIMESTAMP)", + "1970-01-01 10:20:45.0") + + testAllApis( + 'f2.cast(Types.DATE), + "f2.cast(DATE)", + "CAST(f2 AS DATE)", + "1990-10-14") + + testAllApis( + 'f2.cast(Types.TIME), + "f2.cast(TIME)", + "CAST(f2 AS TIME)", + "10:20:45") + + testAllApis( + 'f2.cast(Types.TIME), + "f2.cast(TIME)", + "CAST(f2 AS TIME)", + "10:20:45") + + testTableApi( + 'f7.cast(Types.DATE), + "f7.cast(DATE)", + "2002-11-09") + + testTableApi( + 'f7.cast(Types.DATE).cast(Types.INT), + "f7.cast(DATE).cast(INT)", + "12000") + + testTableApi( + 'f7.cast(Types.TIME), + "f7.cast(TIME)", + "00:00:12") + + testTableApi( + 'f7.cast(Types.TIME).cast(Types.INT), + "f7.cast(TIME).cast(INT)", + "12000") + + testTableApi( + 'f8.cast(Types.TIMESTAMP), + "f8.cast(TIMESTAMP)", + "2016-06-27 07:23:33.0") + + testTableApi( + 'f8.cast(Types.TIMESTAMP).cast(Types.LONG), + "f8.cast(TIMESTAMP).cast(LONG)", + "1467012213000") + } + + @Test + def testTimeIntervalCasting(): Unit = { + testTableApi( + 'f7.cast(Types.INTERVAL_MONTHS), + "f7.cast(INTERVAL_MONTHS)", + "+1000-00") + + testTableApi( + 'f8.cast(Types.INTERVAL_MILLIS), + "f8.cast(INTERVAL_MILLIS)", + "+16979 07:23:33.000") + } + + @Test + def testTimePointComparison(): Unit = { + testAllApis( + 'f0 < 'f3, + "f0 < f3", + "f0 < f3", + "false") + + testAllApis( + 'f0 < 'f4, + "f0 < f4", + "f0 < f4", + "true") + + testAllApis( + 'f1 < 'f5, + "f1 < f5", + "f1 < f5", + "false") + + testAllApis( + 'f0.cast(Types.TIMESTAMP) !== 'f2, + "f0.cast(TIMESTAMP) !== f2", + "CAST(f0 AS TIMESTAMP) <> f2", + "true") + + testAllApis( + 'f0.cast(Types.TIMESTAMP) === 'f6, + "f0.cast(TIMESTAMP) === f6", + "CAST(f0 AS TIMESTAMP) = f6", + "true") + } + + @Test + def testTimeIntervalArithmetic(): Unit = { + testAllApis( + 12.month < 24.month, + "12.month < 24.month", + "INTERVAL '12' MONTH < INTERVAL '24' MONTH", + "true") + + testAllApis( + 8.milli > 10.milli, + "8.milli > 10.milli", + "INTERVAL '0.008' SECOND > INTERVAL '0.010' SECOND", + "false") + + testAllApis( + 8.year === 8.year, + "8.year === 8.year", + "INTERVAL '8' YEAR = INTERVAL '8' YEAR", + "true") + + testAllApis( + 8.year + 10.month, + "8.year + 10.month", + "INTERVAL '8' YEAR + INTERVAL '10' MONTH", + "+8-10") + + testAllApis( + 8.hour + 10.minute + 12.second + 5.milli, + "8.hour + 10.minute + 12.second + 5.milli", + "INTERVAL '8' HOUR + INTERVAL '10' MINUTE + INTERVAL '12.005' SECOND", + "+0 08:10:12.005") + + testAllApis( + 1.minute - 10.second, + "1.minute - 10.second", + "INTERVAL '1' MINUTE - INTERVAL '10' SECOND", + "+0 00:00:50.000") + + testAllApis( + 2.year - 12.month, + "2.year - 12.month", + "INTERVAL '2' YEAR - INTERVAL '12' MONTH", + "+1-00") + + testAllApis( + -'f9.cast(Types.INTERVAL_MONTHS), + "-f9.cast(INTERVAL_MONTHS)", + "-CAST(f9 AS INTERVAL YEAR)", + "-2-00") + + testAllApis( + 'f0 + 2.day, + "f0 + 2.day", + "f0 + INTERVAL '2' DAY", + "1990-10-16") + + testAllApis( + 30.day + 'f0, + "30.day + f0", + "INTERVAL '30' DAY + f0", + "1990-11-13") + + testAllApis( + 'f1 + 12.hour, + "f1 + 12.hour", + "f1 + INTERVAL '12' HOUR", + "22:20:45") + + testAllApis( + 24.hour + 'f1, + "24.hour + f1", + "INTERVAL '24' HOUR + f1", + "10:20:45") + + testAllApis( + 'f2 + 10.day + 4.milli, + "f2 + 10.day + 4.milli", + "f2 + INTERVAL '10 00:00:00.004' DAY TO SECOND", + "1990-10-24 10:20:45.127") + } + + // ---------------------------------------------------------------------------------------------- + + def testData = { + val testData = new Row(11) + testData.setField(0, Date.valueOf("1990-10-14")) + testData.setField(1, Time.valueOf("10:20:45")) + testData.setField(2, Timestamp.valueOf("1990-10-14 10:20:45.123")) + testData.setField(3, Date.valueOf("1990-10-13")) + testData.setField(4, Date.valueOf("1990-10-15")) + testData.setField(5, Time.valueOf("00:00:00")) + testData.setField(6, Timestamp.valueOf("1990-10-14 00:00:00.0")) + testData.setField(7, 12000) + testData.setField(8, 1467012213000L) + testData.setField(9, 24) + testData.setField(10, 12000L) + testData + } + + def typeInfo = { + new RowTypeInfo(Seq( + Types.DATE, + Types.TIME, + Types.TIMESTAMP, + Types.DATE, + Types.DATE, + Types.TIME, + Types.TIMESTAMP, + Types.INT, + Types.LONG, + Types.INTERVAL_MONTHS, + Types.INTERVAL_MILLIS)).asInstanceOf[TypeInformation[Any]] + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f170e04a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/TimeTypesTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/TimeTypesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/TimeTypesTest.scala deleted file mode 100644 index 532a3bd..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/TimeTypesTest.scala +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expression - -import java.sql.{Date, Time, Timestamp} - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala.table._ -import org.apache.flink.api.table.expressions.utils.ExpressionTestBase -import org.apache.flink.api.table.typeutils.RowTypeInfo -import org.apache.flink.api.table.{Row, Types} -import org.junit.Test - -class TimeTypesTest extends ExpressionTestBase { - - @Test - def testTimeLiterals(): Unit = { - testAllApis( - "1990-10-14".toDate, - "'1990-10-14'.toDate", - "DATE '1990-10-14'", - "1990-10-14") - - testTableApi( - Date.valueOf("2040-09-11"), - "'2040-09-11'.toDate", - "2040-09-11") - - testAllApis( - "1500-04-30".cast(Types.DATE), - "'1500-04-30'.cast(DATE)", - "CAST('1500-04-30' AS DATE)", - "1500-04-30") - - testAllApis( - "15:45:59".toTime, - "'15:45:59'.toTime", - "TIME '15:45:59'", - "15:45:59") - - testTableApi( - Time.valueOf("00:00:00"), - "'00:00:00'.toTime", - "00:00:00") - - testAllApis( - "1:30:00".cast(Types.TIME), - "'1:30:00'.cast(TIME)", - "CAST('1:30:00' AS TIME)", - "01:30:00") - - testAllApis( - "1990-10-14 23:00:00.123".toTimestamp, - "'1990-10-14 23:00:00.123'.toTimestamp", - "TIMESTAMP '1990-10-14 23:00:00.123'", - "1990-10-14 23:00:00.123") - - testTableApi( - Timestamp.valueOf("2040-09-11 00:00:00.000"), - "'2040-09-11 00:00:00.000'.toTimestamp", - "2040-09-11 00:00:00.0") - - testAllApis( - "1500-04-30 12:00:00".cast(Types.TIMESTAMP), - "'1500-04-30 12:00:00'.cast(TIMESTAMP)", - "CAST('1500-04-30 12:00:00' AS TIMESTAMP)", - "1500-04-30 12:00:00.0") - } - - @Test - def testTimeInput(): Unit = { - testAllApis( - 'f0, - "f0", - "f0", - "1990-10-14") - - testAllApis( - 'f1, - "f1", - "f1", - "10:20:45") - - testAllApis( - 'f2, - "f2", - "f2", - "1990-10-14 10:20:45.123") - } - - @Test - def testTimeCasting(): Unit = { - testAllApis( - 'f0.cast(Types.TIMESTAMP), - "f0.cast(TIMESTAMP)", - "CAST(f0 AS TIMESTAMP)", - "1990-10-14 00:00:00.0") - - testAllApis( - 'f1.cast(Types.TIMESTAMP), - "f1.cast(TIMESTAMP)", - "CAST(f1 AS TIMESTAMP)", - "1970-01-01 10:20:45.0") - - testAllApis( - 'f2.cast(Types.DATE), - "f2.cast(DATE)", - "CAST(f2 AS DATE)", - "1990-10-14") - - testAllApis( - 'f2.cast(Types.TIME), - "f2.cast(TIME)", - "CAST(f2 AS TIME)", - "10:20:45") - - testAllApis( - 'f2.cast(Types.TIME), - "f2.cast(TIME)", - "CAST(f2 AS TIME)", - "10:20:45") - - testTableApi( - 'f7.cast(Types.DATE), - "f7.cast(DATE)", - "2002-11-09") - - testTableApi( - 'f7.cast(Types.DATE).cast(Types.INT), - "f7.cast(DATE).cast(INT)", - "12000") - - testTableApi( - 'f7.cast(Types.TIME), - "f7.cast(TIME)", - "00:00:12") - - testTableApi( - 'f7.cast(Types.TIME).cast(Types.INT), - "f7.cast(TIME).cast(INT)", - "12000") - - testTableApi( - 'f8.cast(Types.TIMESTAMP), - "f8.cast(TIMESTAMP)", - "2016-06-27 07:23:33.0") - - testTableApi( - 'f8.cast(Types.TIMESTAMP).cast(Types.LONG), - "f8.cast(TIMESTAMP).cast(LONG)", - "1467012213000") - } - - @Test - def testTimeComparison(): Unit = { - testAllApis( - 'f0 < 'f3, - "f0 < f3", - "f0 < f3", - "false") - - testAllApis( - 'f0 < 'f4, - "f0 < f4", - "f0 < f4", - "true") - - testAllApis( - 'f1 < 'f5, - "f1 < f5", - "f1 < f5", - "false") - - testAllApis( - 'f0.cast(Types.TIMESTAMP) !== 'f2, - "f0.cast(TIMESTAMP) !== f2", - "CAST(f0 AS TIMESTAMP) <> f2", - "true") - - testAllApis( - 'f0.cast(Types.TIMESTAMP) === 'f6, - "f0.cast(TIMESTAMP) === f6", - "CAST(f0 AS TIMESTAMP) = f6", - "true") - } - - // ---------------------------------------------------------------------------------------------- - - def testData = { - val testData = new Row(9) - testData.setField(0, Date.valueOf("1990-10-14")) - testData.setField(1, Time.valueOf("10:20:45")) - testData.setField(2, Timestamp.valueOf("1990-10-14 10:20:45.123")) - testData.setField(3, Date.valueOf("1990-10-13")) - testData.setField(4, Date.valueOf("1990-10-15")) - testData.setField(5, Time.valueOf("00:00:00")) - testData.setField(6, Timestamp.valueOf("1990-10-14 00:00:00.0")) - testData.setField(7, 12000) - testData.setField(8, 1467012213000L) - testData - } - - def typeInfo = { - new RowTypeInfo(Seq( - Types.DATE, - Types.TIME, - Types.TIMESTAMP, - Types.DATE, - Types.DATE, - Types.TIME, - Types.TIMESTAMP, - Types.INT, - Types.LONG)).asInstanceOf[TypeInformation[Any]] - } -}