[FLINK-5481] [core] Add utility method to easily generate RowTypeInfos This closes #3127
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eb4db536 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eb4db536 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eb4db536 Branch: refs/heads/master Commit: eb4db536dd7b03f33849adb4b5f1926fcc57efd8 Parents: 97ca03a Author: tonycox <anton_solo...@epam.com> Authored: Fri Jan 27 12:06:09 2017 +0400 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Apr 20 10:52:37 2017 +0200 ---------------------------------------------------------------------- .../apache/flink/api/java/typeutils/Types.java | 72 ++++++++++++++++++++ .../org/apache/flink/table/api/Types.scala | 50 ++++++++++---- .../stringexpr/CalcStringExpressionTest.scala | 4 +- .../flink/table/expressions/ArrayTypeTest.scala | 2 +- .../table/expressions/ScalarFunctionsTest.scala | 6 +- .../table/expressions/TemporalTypesTest.scala | 46 ++++++------- .../UserDefinedScalarFunctionTest.scala | 8 +-- .../utils/UserDefinedScalarFunctions.scala | 2 +- 8 files changed, 142 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/eb4db536/flink-core/src/main/java/org/apache/flink/api/java/typeutils/Types.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/Types.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/Types.java new file mode 100644 index 0000000..5159cde --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/Types.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; + +/** + * This class enumerates all supported types of + * the BasicTypeInfo, SqlTimeTypeInfo and RowTypeInfo for creation simplifying + */ +public class Types { + + public static final BasicTypeInfo<String> STRING = BasicTypeInfo.STRING_TYPE_INFO; + public static final BasicTypeInfo<Boolean> BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + public static final BasicTypeInfo<Byte> BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + public static final BasicTypeInfo<Short> SHORT = BasicTypeInfo.SHORT_TYPE_INFO; + public static final BasicTypeInfo<Integer> INT = BasicTypeInfo.INT_TYPE_INFO; + public static final BasicTypeInfo<Long> LONG = BasicTypeInfo.LONG_TYPE_INFO; + public static final BasicTypeInfo<Float> FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; + public static final BasicTypeInfo<Double> DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; + public static final BasicTypeInfo<BigDecimal> DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + + public static final SqlTimeTypeInfo<Date> SQL_DATE = SqlTimeTypeInfo.DATE; + public static final SqlTimeTypeInfo<Time> SQL_TIME = SqlTimeTypeInfo.TIME; + public static final SqlTimeTypeInfo<Timestamp> SQL_TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP; + + /** + * Generates RowTypeInfo with default names (f1, f2 ..). + * same as new RowTypeInfo(types) + * + * @param types of Row fields. e.g. ROW(Types.STRING, Types.INT) + */ + public static RowTypeInfo ROW(TypeInformation<?>... types) { + return new RowTypeInfo(types); + } + + /** + * Generates RowTypeInfo. + * same as new RowTypeInfo(types, names) + * + * e.g. ROW(new String[]{"name", "number"}, Types.STRING, Types.INT) + * + * @param fieldNames array of field names + * @param types array of field types + */ + public static RowTypeInfo ROW(String[] fieldNames, TypeInformation<?>... types) { + return new RowTypeInfo(types, fieldNames); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/eb4db536/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala index 939cb67..b0cb338 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala @@ -17,29 +17,51 @@ */ package org.apache.flink.table.api -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.typeutils.TimeIntervalTypeInfo +import org.apache.flink.api.java.typeutils.{Types => JTypes} /** * This class enumerates all supported types of the Table API. */ -object Types { +object Types extends JTypes { - val STRING = BasicTypeInfo.STRING_TYPE_INFO - val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO + val STRING = JTypes.STRING + val BOOLEAN = JTypes.BOOLEAN - val BYTE = BasicTypeInfo.BYTE_TYPE_INFO - val SHORT = BasicTypeInfo.SHORT_TYPE_INFO - val INT = BasicTypeInfo.INT_TYPE_INFO - val LONG = BasicTypeInfo.LONG_TYPE_INFO - val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO - val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO - val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO + val BYTE = JTypes.BYTE + val SHORT = JTypes.SHORT + val INT = JTypes.INT + val LONG = JTypes.LONG + val FLOAT = JTypes.FLOAT + val DOUBLE = JTypes.DOUBLE + val DECIMAL = JTypes.DECIMAL - val DATE = SqlTimeTypeInfo.DATE - val TIME = SqlTimeTypeInfo.TIME - val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP + val SQL_DATE = JTypes.SQL_DATE + val SQL_TIME = JTypes.SQL_TIME + val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS + /** + * Generates RowTypeInfo with default names (f1, f2 ..). + * same as ``new RowTypeInfo(types)`` + * + * @param types of Row fields. e.g. ROW(Types.STRING, Types.INT) + */ + def ROW[T: Manifest](types: TypeInformation[_]*) = { + JTypes.ROW(types: _*) + } + + /** + * Generates RowTypeInfo. + * same as ``new RowTypeInfo(types, names)`` + * + * @param fields of Row. e.g. ROW(("name", Types.STRING), ("number", Types.INT)) + */ + def ROW_NAMED(fields: (String, TypeInformation[_])*) = { + val names = fields.toList.map(_._1).toArray + val types = fields.toList.map(_._2) + JTypes.ROW(names, types: _*) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/eb4db536/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala index a5a5241..819d6d4 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala @@ -357,8 +357,8 @@ class CalcStringExpressionTest { .toTable(tEnv, 'a, 'b, 'c, 'd, 'e) val t1 = t.select('a, 'b, 'c, 'd, 'e, BigDecimal("11.2"), BigDecimal("11.2").bigDecimal, - "1984-07-12".cast(Types.DATE), "14:34:24".cast(Types.TIME), - "1984-07-12 14:34:24".cast(Types.TIMESTAMP)) + "1984-07-12".cast(Types.SQL_DATE), "14:34:24".cast(Types.SQL_TIME), + "1984-07-12 14:34:24".cast(Types.SQL_TIMESTAMP)) val t2 = t.select("a, b, c, d, e, 11.2, 11.2," + "'1984-07-12'.toDate, '14:34:24'.toTime," + "'1984-07-12 14:34:24'.toTimestamp") http://git-wip-us.apache.org/repos/asf/flink/blob/eb4db536/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala index 49cf572..72b5ab8 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala @@ -347,7 +347,7 @@ class ArrayTypeTest extends ExpressionTestBase { Types.INT, Types.INT, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO, - ObjectArrayTypeInfo.getInfoFor(Types.DATE), + ObjectArrayTypeInfo.getInfoFor(Types.SQL_DATE), ObjectArrayTypeInfo.getInfoFor(ObjectArrayTypeInfo.getInfoFor(Types.INT)), ObjectArrayTypeInfo.getInfoFor(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO), ObjectArrayTypeInfo.getInfoFor(Types.INT), http://git-wip-us.apache.org/repos/asf/flink/blob/eb4db536/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala index 9258c02..474043a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala @@ -1150,9 +1150,9 @@ class ScalarFunctionsTest extends ExpressionTestBase { Types.DOUBLE, Types.INT, Types.DECIMAL, - Types.DATE, - Types.TIME, - Types.TIMESTAMP, + Types.SQL_DATE, + Types.SQL_TIME, + Types.SQL_TIMESTAMP, Types.INTERVAL_MILLIS, Types.INTERVAL_MONTHS, Types.BOOLEAN, http://git-wip-us.apache.org/repos/asf/flink/blob/eb4db536/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala index 840bec1..58577be 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala @@ -44,7 +44,7 @@ class TemporalTypesTest extends ExpressionTestBase { "2040-09-11") testAllApis( - "1500-04-30".cast(Types.DATE), + "1500-04-30".cast(Types.SQL_DATE), "'1500-04-30'.cast(DATE)", "CAST('1500-04-30' AS DATE)", "1500-04-30") @@ -61,7 +61,7 @@ class TemporalTypesTest extends ExpressionTestBase { "00:00:00") testAllApis( - "1:30:00".cast(Types.TIME), + "1:30:00".cast(Types.SQL_TIME), "'1:30:00'.cast(TIME)", "CAST('1:30:00' AS TIME)", "01:30:00") @@ -78,7 +78,7 @@ class TemporalTypesTest extends ExpressionTestBase { "2040-09-11 00:00:00.0") testAllApis( - "1500-04-30 12:00:00".cast(Types.TIMESTAMP), + "1500-04-30 12:00:00".cast(Types.SQL_TIMESTAMP), "'1500-04-30 12:00:00'.cast(TIMESTAMP)", "CAST('1500-04-30 12:00:00' AS TIMESTAMP)", "1500-04-30 12:00:00.0") @@ -168,62 +168,62 @@ class TemporalTypesTest extends ExpressionTestBase { @Test def testTimePointCasting(): Unit = { testAllApis( - 'f0.cast(Types.TIMESTAMP), + 'f0.cast(Types.SQL_TIMESTAMP), "f0.cast(TIMESTAMP)", "CAST(f0 AS TIMESTAMP)", "1990-10-14 00:00:00.0") testAllApis( - 'f1.cast(Types.TIMESTAMP), + 'f1.cast(Types.SQL_TIMESTAMP), "f1.cast(TIMESTAMP)", "CAST(f1 AS TIMESTAMP)", "1970-01-01 10:20:45.0") testAllApis( - 'f2.cast(Types.DATE), + 'f2.cast(Types.SQL_DATE), "f2.cast(DATE)", "CAST(f2 AS DATE)", "1990-10-14") testAllApis( - 'f2.cast(Types.TIME), + 'f2.cast(Types.SQL_TIME), "f2.cast(TIME)", "CAST(f2 AS TIME)", "10:20:45") testAllApis( - 'f2.cast(Types.TIME), + 'f2.cast(Types.SQL_TIME), "f2.cast(TIME)", "CAST(f2 AS TIME)", "10:20:45") testTableApi( - 'f7.cast(Types.DATE), + 'f7.cast(Types.SQL_DATE), "f7.cast(DATE)", "2002-11-09") testTableApi( - 'f7.cast(Types.DATE).cast(Types.INT), + 'f7.cast(Types.SQL_DATE).cast(Types.INT), "f7.cast(DATE).cast(INT)", "12000") testTableApi( - 'f7.cast(Types.TIME), + 'f7.cast(Types.SQL_TIME), "f7.cast(TIME)", "00:00:12") testTableApi( - 'f7.cast(Types.TIME).cast(Types.INT), + 'f7.cast(Types.SQL_TIME).cast(Types.INT), "f7.cast(TIME).cast(INT)", "12000") testTableApi( - 'f8.cast(Types.TIMESTAMP), + 'f8.cast(Types.SQL_TIMESTAMP), "f8.cast(TIMESTAMP)", "2016-06-27 07:23:33.0") testTableApi( - 'f8.cast(Types.TIMESTAMP).cast(Types.LONG), + 'f8.cast(Types.SQL_TIMESTAMP).cast(Types.LONG), "f8.cast(TIMESTAMP).cast(LONG)", "1467012213000") } @@ -262,13 +262,13 @@ class TemporalTypesTest extends ExpressionTestBase { "false") testAllApis( - 'f0.cast(Types.TIMESTAMP) !== 'f2, + 'f0.cast(Types.SQL_TIMESTAMP) !== 'f2, "f0.cast(TIMESTAMP) !== f2", "CAST(f0 AS TIMESTAMP) <> f2", "true") testAllApis( - 'f0.cast(Types.TIMESTAMP) === 'f6, + 'f0.cast(Types.SQL_TIMESTAMP) === 'f6, "f0.cast(TIMESTAMP) === f6", "CAST(f0 AS TIMESTAMP) = f6", "true") @@ -558,13 +558,13 @@ class TemporalTypesTest extends ExpressionTestBase { def typeInfo = { new RowTypeInfo( - Types.DATE, - Types.TIME, - Types.TIMESTAMP, - Types.DATE, - Types.DATE, - Types.TIME, - Types.TIMESTAMP, + Types.SQL_DATE, + Types.SQL_TIME, + Types.SQL_TIMESTAMP, + Types.SQL_DATE, + Types.SQL_DATE, + Types.SQL_TIME, + Types.SQL_TIMESTAMP, Types.INT, Types.LONG, Types.INTERVAL_MONTHS, http://git-wip-us.apache.org/repos/asf/flink/blob/eb4db536/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala index 51583c3..91cce0c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala @@ -284,7 +284,7 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase { "7591 and 43810000 and 655906210000") testAllApis( - JavaFunc1(Null(Types.TIME), 15, Null(Types.TIMESTAMP)), + JavaFunc1(Null(Types.SQL_TIME), 15, Null(Types.SQL_TIMESTAMP)), "JavaFunc1(Null(TIME), 15, Null(TIMESTAMP))", "JavaFunc1(NULL, 15, NULL)", "null and 15 and null") @@ -337,9 +337,9 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase { Types.STRING, Types.BOOLEAN, TypeInformation.of(classOf[SimplePojo]), - Types.DATE, - Types.TIME, - Types.TIMESTAMP, + Types.SQL_DATE, + Types.SQL_TIME, + Types.SQL_TIMESTAMP, Types.INTERVAL_MONTHS, Types.INTERVAL_MILLIS, TypeInformation.of(classOf[Seq[String]]) http://git-wip-us.apache.org/repos/asf/flink/blob/eb4db536/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala index e858187..8972a77 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala @@ -106,7 +106,7 @@ object Func10 extends ScalarFunction { } override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = { - Types.TIMESTAMP + Types.SQL_TIMESTAMP } }