This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new dd68edd [FLINK-11785][table-api] Replace case class Null(type) by nullOf(type) expression dd68edd is described below commit dd68edd8e6519013d2ab3bd2d4d815f1997ac0fe Author: Timo Walther <twal...@apache.org> AuthorDate: Thu Feb 28 15:33:09 2019 +0100 [FLINK-11785][table-api] Replace case class Null(type) by nullOf(type) expression This introduces `nullOf(type)` for representing typed nulls in Table API. It allows to uncouple API from expression case classes and enables us to have `nullOf(type)` and `null` in the future, once we introduced a NULL type and proper type inference. Furthermore, it also integrates better in existing expressions that all start with lower case characters. This closes #7864. --- docs/dev/table/tableApi.md | 18 ++++---- .../flink/table/api/scala/expressionDsl.scala | 17 +++++++ .../flink/table/expressions/ExpressionParser.scala | 3 +- .../apache/flink/table/expressions/literals.scala | 3 ++ .../table/api/batch/table/SetOperatorsTest.scala | 3 +- .../flink/table/api/stream/sql/JoinTest.scala | 3 +- .../flink/table/expressions/ArrayTypeTest.scala | 8 ++-- .../flink/table/expressions/MapTypeTest.scala | 4 +- .../flink/table/expressions/RowTypeTest.scala | 4 +- .../table/expressions/ScalarFunctionsTest.scala | 52 +++++++++++----------- .../table/expressions/ScalarOperatorsTest.scala | 34 +++++++------- .../UserDefinedScalarFunctionTest.scala | 20 ++++----- .../table/runtime/stream/sql/JoinITCase.scala | 11 +++-- .../runtime/stream/table/AggregateITCase.scala | 5 +-- .../table/runtime/stream/table/JoinITCase.scala | 18 ++++---- 15 files changed, 111 insertions(+), 92 deletions(-) diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md index 08a5a8a..a509d85 100644 --- a/docs/dev/table/tableApi.md +++ b/docs/dev/table/tableApi.md @@ -1780,7 +1780,7 @@ atom = ( "(" , expression , ")" ) | literal | fieldReference ; fieldReference = "*" | identifier ; -nullLiteral = "Null(" , dataType , ")" ; +nullLiteral = "nullOf(" , dataType , ")" ; timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "QUARTER" | "WEEK" | "DAY" | "DAY_TO_HOUR" | "DAY_TO_MINUTE" | "DAY_TO_SECOND" | "HOUR" | "HOUR_TO_MINUTE" | "HOUR_TO_SECOND" | "MINUTE" | "MINUTE_TO_SECOND" | "SECOND" ; @@ -1794,18 +1794,20 @@ timeIndicator = fieldReference , "." , ( "proctime" | "rowtime" ) ; {% endhighlight %} -Here, `literal` is a valid Java literal. String literals can be specified using single or double quotes. Duplicate the quote for escaping (e.g. `'It''s me.'` or `"I ""like"" dogs."`). +**Literals:** Here, `literal` is a valid Java literal. String literals can be specified using single or double quotes. Duplicate the quote for escaping (e.g. `'It''s me.'` or `"I ""like"" dogs."`). -The `fieldReference` specifies a column in the data (or all columns if `*` is used), and `functionIdentifier` specifies a supported scalar function. The column names and function names follow Java identifier syntax. +**Null literals:** Null literals must have a type attached. Use `nullOf(type)` (e.g. `nullOf(INT)`) for creating a null value. -Expressions specified as strings can also use prefix notation instead of suffix notation to call operators and functions. +**Field references:** The `fieldReference` specifies a column in the data (or all columns if `*` is used), and `functionIdentifier` specifies a supported scalar function. The column names and function names follow Java identifier syntax. -If working with exact numeric values or large decimals is required, the Table API also supports Java's BigDecimal type. In the Scala Table API decimals can be defined by `BigDecimal("123456")` and in Java by appending a "p" for precise e.g. `123456p`. +**Function calls:** Expressions specified as strings can also use prefix notation instead of suffix notation to call operators and functions. -In order to work with temporal values the Table API supports Java SQL's Date, Time, and Timestamp types. In the Scala Table API literals can be defined by using `java.sql.Date.valueOf("2016-06-27")`, `java.sql.Time.valueOf("10:10:42")`, or `java.sql.Timestamp.valueOf("2016-06-27 10:10:42.123")`. The Java and Scala Table API also support calling `"2016-06-27".toDate()`, `"10:10:42".toTime()`, and `"2016-06-27 10:10:42.123".toTimestamp()` for converting Strings into temporal types. *Note:* [...] +**Decimals:** If working with exact numeric values or large decimals is required, the Table API also supports Java's BigDecimal type. In the Scala Table API decimals can be defined by `BigDecimal("123456")` and in Java by appending a "p" for precise e.g. `123456p`. -Temporal intervals can be represented as number of months (`Types.INTERVAL_MONTHS`) or number of milliseconds (`Types.INTERVAL_MILLIS`). Intervals of same type can be added or subtracted (e.g. `1.hour + 10.minutes`). Intervals of milliseconds can be added to time points (e.g. `"2016-08-10".toDate + 5.days`). +**Time representation:** In order to work with temporal values the Table API supports Java SQL's Date, Time, and Timestamp types. In the Scala Table API literals can be defined by using `java.sql.Date.valueOf("2016-06-27")`, `java.sql.Time.valueOf("10:10:42")`, or `java.sql.Timestamp.valueOf("2016-06-27 10:10:42.123")`. The Java and Scala Table API also support calling `"2016-06-27".toDate()`, `"10:10:42".toTime()`, and `"2016-06-27 10:10:42.123".toTimestamp()` for converting Strings int [...] -Scala expressions use implicit conversions. Therefore, make sure to add the wildcard import `org.apache.flink.table.api.scala._` to your programs. In case a literal is not treated as an expression, use `.toExpr` such as `3.toExpr` to force a literal to be converted. +**Temporal intervals:** Temporal intervals can be represented as number of months (`Types.INTERVAL_MONTHS`) or number of milliseconds (`Types.INTERVAL_MILLIS`). Intervals of same type can be added or subtracted (e.g. `1.hour + 10.minutes`). Intervals of milliseconds can be added to time points (e.g. `"2016-08-10".toDate + 5.days`). + +**Scala expressions:** Scala expressions use implicit conversions. Therefore, make sure to add the wildcard import `org.apache.flink.table.api.scala._` to your programs. In case a literal is not treated as an expression, use `.toExpr` such as `3.toExpr` to force a literal to be converted. {% top %} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index d101717..5e6549d 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -1414,4 +1414,21 @@ object uuid { } } +/** + * Returns a null literal value of a given type. + * + * e.g. nullOf(Types.INT) + */ +object nullOf { + + /** + * Returns a null literal value of a given type. + * + * e.g. nullOf(Types.INT) + */ + def apply(typeInfo: TypeInformation[_]): Expression = { + Null(typeInfo) + } +} + // scalastyle:on object.name diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala index b45ce80..b502827 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala @@ -53,6 +53,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val ASC: Keyword = Keyword("asc") lazy val DESC: Keyword = Keyword("desc") lazy val NULL: Keyword = Keyword("Null") + lazy val NULL_OF: Keyword = Keyword("nullOf") lazy val IF: Keyword = Keyword("?") lazy val TO_DATE: Keyword = Keyword("toDate") lazy val TO_TIME: Keyword = Keyword("toTime") @@ -207,7 +208,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { str => Literal(str.toBoolean) } - lazy val nullLiteral: PackratParser[Expression] = NULL ~ "(" ~> dataType <~ ")" ^^ { + lazy val nullLiteral: PackratParser[Expression] = (NULL | NULL_OF) ~ "(" ~> dataType <~ ")" ^^ { dt => Null(dt) } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/literals.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/literals.scala index 24bae90..bf39d99 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/literals.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/literals.scala @@ -122,6 +122,9 @@ case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpre } } +@deprecated( + "Use nullOf(TypeInformation) instead. It is available through the implicit Scala DSL.", + "1.8.0") case class Null(resultType: TypeInformation[_]) extends LeafExpression { override def toString = s"null" diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala index 9226200..f0f1ca3 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala @@ -24,7 +24,6 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.table.api.Types import org.apache.flink.table.api.scala._ -import org.apache.flink.table.expressions.Null import org.apache.flink.table.runtime.utils.CommonTestData.NonPojo import org.apache.flink.table.utils.TableTestBase import org.apache.flink.table.utils.TableTestUtil._ @@ -88,7 +87,7 @@ class SetOperatorsTest extends TableTestBase { val in = t.select('a) .unionAll( - t.select(('c > 0) ? ('b, Null(createTypeInformation[(Int, String)])))) + t.select(('c > 0) ? ('b, nullOf(createTypeInformation[(Int, String)])))) val expected = binaryNode( "DataSetUnion", diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala index 37d5bc1..8e24413 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala @@ -22,7 +22,6 @@ import org.apache.flink.api.scala._ import org.apache.flink.table.api.Types import org.apache.flink.table.api.scala._ import org.apache.flink.table.calcite.RelTimeIndicatorConverter -import org.apache.flink.table.expressions.Null import org.apache.flink.table.plan.logical.TumblingGroupWindow import org.apache.flink.table.runtime.join.WindowJoinUtil import org.apache.flink.table.utils.TableTestUtil.{term, _} @@ -256,7 +255,7 @@ class JoinTest extends TableTestBase { val streamUtil: StreamTableTestUtil = streamTestUtil() val t1 = streamUtil.addTable[(Int, Long, String)]("Table1", 'a, 'b, 'c, 'proctime.proctime) - .select('a, 'b, 'c, 'proctime, Null(Types.LONG) as 'nullField) + .select('a, 'b, 'c, 'proctime, nullOf(Types.LONG) as 'nullField) val t2 = streamUtil.addTable[(Int, Long, String)]("Table2", 'a, 'b, 'c, 'proctime.proctime) .select('a, 'b, 'c, 'proctime, 12L as 'nullField) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala index e0292b2..a7aae1c 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala @@ -54,14 +54,14 @@ class ArrayTypeTest extends ArrayTypeTestBase { "[2, 9]") testAllApis( - array(Null(Types.INT), 1), - "array(Null(INT), 1)", + array(nullOf(Types.INT), 1), + "array(nullOf(INT), 1)", "ARRAY[NULLIF(1,1), 1]", "[null, 1]") testAllApis( - array(array(Null(Types.INT), 1)), - "array(array(Null(INT), 1))", + array(array(nullOf(Types.INT), 1)), + "array(array(nullOf(INT), 1))", "ARRAY[ARRAY[NULLIF(1,1), 1]]", "[[null, 1]]") diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala index 0a30eb0..56cfd0f 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala @@ -54,8 +54,8 @@ class MapTypeTest extends MapTypeTestBase { "{2=2, 3=9}") testAllApis( - map(1, Null(Types.INT)), - "map(1, Null(INT))", + map(1, nullOf(Types.INT)), + "map(1, nullOf(INT))", "map[1, NULLIF(1,1)]", "{1=null}") diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/RowTypeTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/RowTypeTest.scala index df84a84..7893e05 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/RowTypeTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/RowTypeTest.scala @@ -52,8 +52,8 @@ class RowTypeTest extends RowTypeTestBase { "1985-04-11,0.1,[1, 2, 3],{foo=bar},1,true") // string flatten testAllApis( - row(1 + 1, 2 * 3, Null(Types.STRING)), - "row(1 + 1, 2 * 3, Null(STRING))", + row(1 + 1, 2 * 3, nullOf(Types.STRING)), + "row(1 + 1, 2 * 3, nullOf(STRING))", "ROW(1 + 1, 2 * 3, NULLIF(1,1))", "2,6,null" ) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala index 799f636..e5e8dc2 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala @@ -115,14 +115,14 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "null") testAllApis( - 'f0.replace(Null(Types.STRING), ""), - "f0.replace(Null(STRING), '')", + 'f0.replace(nullOf(Types.STRING), ""), + "f0.replace(nullOf(STRING), '')", "REPLACE(f0, NULLIF('', ''), '')", "null") testAllApis( - 'f0.replace(" ", Null(Types.STRING)), - "f0.replace(' ', Null(STRING))", + 'f0.replace(" ", nullOf(Types.STRING)), + "f0.replace(' ', nullOf(STRING))", "REPLACE(f0, ' ', NULLIF('', ''))", "null") } @@ -440,8 +440,8 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "2A") testAllApis( - Null(Types.BYTE).hex(), - "hex(Null(BYTE))", + nullOf(Types.BYTE).hex(), + "hex(nullOf(BYTE))", "HEX(CAST(NULL AS TINYINT))", "null") @@ -529,8 +529,8 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { def testBin(): Unit = { testAllApis( - Null(Types.BYTE).bin(), - "bin(Null(BYTE))", + nullOf(Types.BYTE).bin(), + "bin(nullOf(BYTE))", "BIN((CAST(NULL AS TINYINT)))", "null") @@ -648,8 +648,8 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { // This test was added for the null literal problem in string expression parsing (FLINK-10463). testAllApis( - Null(Types.STRING).regexpReplace("oo|ar", 'f33), - "Null(STRING).regexpReplace('oo|ar', f33)", + nullOf(Types.STRING).regexpReplace("oo|ar", 'f33), + "nullOf(STRING).regexpReplace('oo|ar', f33)", "REGEXP_REPLACE(CAST(NULL AS VARCHAR), 'oo|ar', f33)", "null") } @@ -2645,17 +2645,17 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { } testAllApis( - timestampDiff(TimePointUnit.DAY, Null(Types.SQL_TIMESTAMP), + timestampDiff(TimePointUnit.DAY, nullOf(Types.SQL_TIMESTAMP), "2016-02-24 12:42:25".toTimestamp), - "timestampDiff(DAY, Null(SQL_TIMESTAMP), '2016-02-24 12:42:25'.toTimestamp)", + "timestampDiff(DAY, nullOf(SQL_TIMESTAMP), '2016-02-24 12:42:25'.toTimestamp)", "TIMESTAMPDIFF(DAY, CAST(NULL AS TIMESTAMP), TIMESTAMP '2016-02-24 12:42:25')", "null" ) testAllApis( timestampDiff(TimePointUnit.DAY, "2016-02-24 12:42:25".toTimestamp, - Null(Types.SQL_TIMESTAMP)), - "timestampDiff(DAY, '2016-02-24 12:42:25'.toTimestamp, Null(SQL_TIMESTAMP))", + nullOf(Types.SQL_TIMESTAMP)), + "timestampDiff(DAY, '2016-02-24 12:42:25'.toTimestamp, nullOf(SQL_TIMESTAMP))", "TIMESTAMPDIFF(DAY, TIMESTAMP '2016-02-24 12:42:25', CAST(NULL AS TIMESTAMP))", "null" ) @@ -2779,20 +2779,20 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { } testAllApis( - "2016-02-24 12:42:25".toTimestamp + Null(Types.INTERVAL_MILLIS), - "'2016-02-24 12:42:25'.toTimestamp + Null(INTERVAL_MILLIS)", + "2016-02-24 12:42:25".toTimestamp + nullOf(Types.INTERVAL_MILLIS), + "'2016-02-24 12:42:25'.toTimestamp + nullOf(INTERVAL_MILLIS)", "TIMESTAMPADD(HOUR, CAST(NULL AS INTEGER), TIMESTAMP '2016-02-24 12:42:25')", "null") testAllApis( - Null(Types.SQL_TIMESTAMP) + -200.hours, - "Null(SQL_TIMESTAMP) + -200.hours", + nullOf(Types.SQL_TIMESTAMP) + -200.hours, + "nullOf(SQL_TIMESTAMP) + -200.hours", "TIMESTAMPADD(HOUR, -200, CAST(NULL AS TIMESTAMP))", "null") testAllApis( - Null(Types.SQL_TIMESTAMP) + 3.months, - "Null(SQL_TIMESTAMP) + 3.months", + nullOf(Types.SQL_TIMESTAMP) + 3.months, + "nullOf(SQL_TIMESTAMP) + 3.months", "TIMESTAMPADD(MONTH, 3, CAST(NULL AS TIMESTAMP))", "null") @@ -2827,13 +2827,13 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "timestampadd(SECOND, 1, date '2016-06-15')", "2016-06-15 00:00:01.0") - testAllApis(Null(Types.SQL_TIMESTAMP) + 1.second, - "Null(SQL_TIMESTAMP) + 1.second", + testAllApis(nullOf(Types.SQL_TIMESTAMP) + 1.second, + "nullOf(SQL_TIMESTAMP) + 1.second", "timestampadd(SECOND, 1, cast(null as date))", "null") - testAllApis(Null(Types.SQL_TIMESTAMP) + 1.day, - "Null(SQL_TIMESTAMP) + 1.day", + testAllApis(nullOf(Types.SQL_TIMESTAMP) + 1.day, + "nullOf(SQL_TIMESTAMP) + 1.day", "timestampadd(DAY, 1, cast(null as date))", "null") @@ -2986,8 +2986,8 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "null") testAllApis( - "test".sha2(Null(Types.INT)), - "sha2('test', Null(INT))", + "test".sha2(nullOf(Types.INT)), + "sha2('test', nullOf(INT))", "SHA2('test', CAST(NULL AS INT))", "null") diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala index d61627b..088ba65 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala @@ -296,8 +296,8 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase { ) testTableApi( - 'f10.in("This is a test String.", "String", "Hello world", "Comment#1", Null(Types.STRING)), - "f10.in('This is a test String.', 'String', 'Hello world', 'Comment#1', Null(STRING))", + 'f10.in("This is a test String.", "String", "Hello world", "Comment#1", nullOf(Types.STRING)), + "f10.in('This is a test String.', 'String', 'Hello world', 'Comment#1', nullOf(STRING))", "true" ) @@ -308,8 +308,8 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase { ) testTableApi( - 'f10.in("FAIL", "FAIL", Null(Types.STRING)), - "f10.in('FAIL', 'FAIL', Null(STRING))", + 'f10.in("FAIL", "FAIL", nullOf(Types.STRING)), + "f10.in('FAIL', 'FAIL', nullOf(STRING))", "null" ) } @@ -350,10 +350,10 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase { "true") // null - testAllApis(Null(Types.INT), "Null(INT)", "CAST(NULL AS INT)", "null") + testAllApis(nullOf(Types.INT), "nullOf(INT)", "CAST(NULL AS INT)", "null") testAllApis( - Null(Types.STRING) === "", - "Null(STRING) === ''", + nullOf(Types.STRING) === "", + "nullOf(STRING) === ''", "CAST(NULL AS VARCHAR) = ''", "null") @@ -416,26 +416,26 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase { def testBetween(): Unit = { // between testAllApis( - 4.between(Null(Types.INT), 3), - "4.between(Null(INT), 3)", + 4.between(nullOf(Types.INT), 3), + "4.between(nullOf(INT), 3)", "4 BETWEEN NULL AND 3", "false" ) testAllApis( - 4.between(Null(Types.INT), 12), - "4.between(Null(INT), 12)", + 4.between(nullOf(Types.INT), 12), + "4.between(nullOf(INT), 12)", "4 BETWEEN NULL AND 12", "null" ) testAllApis( - 4.between(Null(Types.INT), 3), - "4.between(Null(INT), 3)", + 4.between(nullOf(Types.INT), 3), + "4.between(nullOf(INT), 3)", "4 BETWEEN 5 AND NULL", "false" ) testAllApis( - 4.between(Null(Types.INT), 12), - "4.between(Null(INT), 12)", + 4.between(nullOf(Types.INT), 12), + "4.between(nullOf(INT), 12)", "4 BETWEEN 0 AND NULL", "null" ) @@ -490,8 +490,8 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase { // not between testAllApis( - 2.notBetween(Null(Types.INT), 3), - "2.notBetween(Null(INT), 3)", + 2.notBetween(nullOf(Types.INT), 3), + "2.notBetween(nullOf(INT), 3)", "2 NOT BETWEEN NULL AND 3", "null" ) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala index 1534344..35faa2b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala @@ -121,26 +121,26 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase { @Test def testNullableParameters(): Unit = { testAllApis( - Func3(Null(INT_TYPE_INFO), Null(STRING_TYPE_INFO)), - "Func3(Null(INT), Null(STRING))", + Func3(nullOf(INT_TYPE_INFO), nullOf(STRING_TYPE_INFO)), + "Func3(nullOf(INT), nullOf(STRING))", "Func3(NULL, NULL)", "null and null") testAllApis( - Func3(Null(INT_TYPE_INFO), "Test"), - "Func3(Null(INT), 'Test')", + Func3(nullOf(INT_TYPE_INFO), "Test"), + "Func3(nullOf(INT), 'Test')", "Func3(NULL, 'Test')", "null and Test") testAllApis( - Func3(42, Null(STRING_TYPE_INFO)), - "Func3(42, Null(STRING))", + Func3(42, nullOf(STRING_TYPE_INFO)), + "Func3(42, nullOf(STRING))", "Func3(42, NULL)", "42 and null") testAllApis( - Func0(Null(INT_TYPE_INFO)), - "Func0(Null(INT))", + Func0(nullOf(INT_TYPE_INFO)), + "Func0(nullOf(INT))", "Func0(NULL)", "-1") } @@ -349,8 +349,8 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase { "7591 and 43810000 and 655906210000") testAllApis( - JavaFunc1(Null(Types.SQL_TIME), 15, Null(Types.SQL_TIMESTAMP)), - "JavaFunc1(Null(SQL_TIME), 15, Null(SQL_TIMESTAMP))", + JavaFunc1(nullOf(Types.SQL_TIME), 15, nullOf(Types.SQL_TIMESTAMP)), + "JavaFunc1(nullOf(SQL_TIME), 15, nullOf(SQL_TIMESTAMP))", "JavaFunc1(NULL, 15, NULL)", "null and 15 and null") diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala index c5787c2..ddaf7fc 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala @@ -25,9 +25,8 @@ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark -import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.Types -import org.apache.flink.table.expressions.Null +import org.apache.flink.table.api.scala._ import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} import org.apache.flink.types.Row import org.junit.Assert.assertEquals @@ -69,9 +68,9 @@ class JoinITCase extends StreamingWithStateTestBase { data2.+=((2, 2L, "HeHe")) val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) - .select(('a === 1)?(Null(Types.INT), 'a) as 'a, 'b, 'c, 'proctime) // test null values + .select(('a === 1)?(nullOf(Types.INT), 'a) as 'a, 'b, 'c, 'proctime) // test null values val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) - .select(('a === 1)?(Null(Types.INT), 'a) as 'a, 'b, 'c, 'proctime) // test null values + .select(('a === 1)?(nullOf(Types.INT), 'a) as 'a, 'b, 'c, 'proctime) // test null values tEnv.registerTable("T1", t1) tEnv.registerTable("T2", t2) @@ -947,9 +946,9 @@ class JoinITCase extends StreamingWithStateTestBase { data2.+=((3, 2L, "HeHe")) val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c) - .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) + .select(('a === 3) ? (nullOf(Types.INT), 'a) as 'a, 'b, 'c) val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c) - .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) + .select(('a === 3) ? (nullOf(Types.INT), 'a) as 'a, 'b, 'c) tEnv.registerTable("T1", t1) tEnv.registerTable("T2", t2) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala index 3b2268c..c0a3e24 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala @@ -23,10 +23,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.scala._ -import org.apache.flink.table.runtime.utils.StreamITCase.RetractingSink import org.apache.flink.table.api.{StreamQueryConfig, Types} -import org.apache.flink.table.expressions.Null import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct, DataViewTestAgg, WeightedAvg} +import org.apache.flink.table.runtime.utils.StreamITCase.RetractingSink import org.apache.flink.table.runtime.utils.{JavaUserDefinedAggFunctions, StreamITCase, StreamTestData, StreamingWithStateTestBase} import org.apache.flink.types.Row import org.junit.Assert.assertEquals @@ -142,7 +141,7 @@ class AggregateITCase extends StreamingWithStateTestBase { StreamITCase.clear val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) - .select('b, Null(Types.LONG)).distinct() + .select('b, nullOf(Types.LONG)).distinct() val results = t.toRetractStream[Row](queryConfig) results.addSink(new StreamITCase.RetractingSink).setParallelism(1) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala index 4a69ec5..926319f 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala @@ -18,20 +18,20 @@ package org.apache.flink.table.runtime.stream.table +import org.apache.flink.api.common.time.Time +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.apache.flink.table.api.{StreamQueryConfig, Types} import org.apache.flink.table.api.scala._ -import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} -import org.junit.Assert._ -import org.junit.Test -import org.apache.flink.api.common.time.Time -import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.expressions.Literal import org.apache.flink.table.expressions.utils.Func20 -import org.apache.flink.table.expressions.{Literal, Null} import org.apache.flink.table.functions.aggfunctions.CountAggFunction import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct, WeightedAvg} +import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test import scala.collection.mutable @@ -441,9 +441,9 @@ class JoinITCase extends StreamingWithStateTestBase { env.setStateBackend(getStateBackend) val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) - .select(('a === 21) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) + .select(('a === 21) ? (nullOf(Types.INT), 'a) as 'a, 'b, 'c) val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) - .select(('e === 15) ? (Null(Types.INT), 'd) as 'd, 'e, 'f, 'g, 'h) + .select(('e === 15) ? (nullOf(Types.INT), 'd) as 'd, 'e, 'f, 'g, 'h) val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)