This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dd68edd  [FLINK-11785][table-api] Replace case class Null(type) by 
nullOf(type) expression
dd68edd is described below

commit dd68edd8e6519013d2ab3bd2d4d815f1997ac0fe
Author: Timo Walther <twal...@apache.org>
AuthorDate: Thu Feb 28 15:33:09 2019 +0100

    [FLINK-11785][table-api] Replace case class Null(type) by nullOf(type) 
expression
    
    This introduces `nullOf(type)` for representing typed nulls in Table API. 
It allows
    to uncouple API from expression case classes and enables us to have 
`nullOf(type)` and
    `null` in the future, once we introduced a NULL type and proper type 
inference.
    Furthermore, it also integrates better in existing expressions that all 
start
    with lower case characters.
    
    This closes #7864.
---
 docs/dev/table/tableApi.md                         | 18 ++++----
 .../flink/table/api/scala/expressionDsl.scala      | 17 +++++++
 .../flink/table/expressions/ExpressionParser.scala |  3 +-
 .../apache/flink/table/expressions/literals.scala  |  3 ++
 .../table/api/batch/table/SetOperatorsTest.scala   |  3 +-
 .../flink/table/api/stream/sql/JoinTest.scala      |  3 +-
 .../flink/table/expressions/ArrayTypeTest.scala    |  8 ++--
 .../flink/table/expressions/MapTypeTest.scala      |  4 +-
 .../flink/table/expressions/RowTypeTest.scala      |  4 +-
 .../table/expressions/ScalarFunctionsTest.scala    | 52 +++++++++++-----------
 .../table/expressions/ScalarOperatorsTest.scala    | 34 +++++++-------
 .../UserDefinedScalarFunctionTest.scala            | 20 ++++-----
 .../table/runtime/stream/sql/JoinITCase.scala      | 11 +++--
 .../runtime/stream/table/AggregateITCase.scala     |  5 +--
 .../table/runtime/stream/table/JoinITCase.scala    | 18 ++++----
 15 files changed, 111 insertions(+), 92 deletions(-)

diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 08a5a8a..a509d85 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -1780,7 +1780,7 @@ atom = ( "(" , expression , ")" ) | literal | 
fieldReference ;
 
 fieldReference = "*" | identifier ;
 
-nullLiteral = "Null(" , dataType , ")" ;
+nullLiteral = "nullOf(" , dataType , ")" ;
 
 timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "QUARTER" | "WEEK" | 
"DAY" | "DAY_TO_HOUR" | "DAY_TO_MINUTE" | "DAY_TO_SECOND" | "HOUR" | 
"HOUR_TO_MINUTE" | "HOUR_TO_SECOND" | "MINUTE" | "MINUTE_TO_SECOND" | "SECOND" ;
 
@@ -1794,18 +1794,20 @@ timeIndicator = fieldReference , "." , ( "proctime" | 
"rowtime" ) ;
 
 {% endhighlight %}
 
-Here, `literal` is a valid Java literal. String literals can be specified 
using single or double quotes. Duplicate the quote for escaping (e.g. `'It''s 
me.'` or `"I ""like"" dogs."`).
+**Literals:** Here, `literal` is a valid Java literal. String literals can be 
specified using single or double quotes. Duplicate the quote for escaping (e.g. 
`'It''s me.'` or `"I ""like"" dogs."`).
 
-The `fieldReference` specifies a column in the data (or all columns if `*` is 
used), and `functionIdentifier` specifies a supported scalar function. The 
column names and function names follow Java identifier syntax.
+**Null literals:** Null literals must have a type attached. Use `nullOf(type)` 
(e.g. `nullOf(INT)`) for creating a null value.
 
-Expressions specified as strings can also use prefix notation instead of 
suffix notation to call operators and functions.
+**Field references:** The `fieldReference` specifies a column in the data (or 
all columns if `*` is used), and `functionIdentifier` specifies a supported 
scalar function. The column names and function names follow Java identifier 
syntax.
 
-If working with exact numeric values or large decimals is required, the Table 
API also supports Java's BigDecimal type. In the Scala Table API decimals can 
be defined by `BigDecimal("123456")` and in Java by appending a "p" for precise 
e.g. `123456p`.
+**Function calls:** Expressions specified as strings can also use prefix 
notation instead of suffix notation to call operators and functions.
 
-In order to work with temporal values the Table API supports Java SQL's Date, 
Time, and Timestamp types. In the Scala Table API literals can be defined by 
using `java.sql.Date.valueOf("2016-06-27")`, 
`java.sql.Time.valueOf("10:10:42")`, or `java.sql.Timestamp.valueOf("2016-06-27 
10:10:42.123")`. The Java and Scala Table API also support calling 
`"2016-06-27".toDate()`, `"10:10:42".toTime()`, and `"2016-06-27 
10:10:42.123".toTimestamp()` for converting Strings into temporal types. 
*Note:* [...]
+**Decimals:** If working with exact numeric values or large decimals is 
required, the Table API also supports Java's BigDecimal type. In the Scala 
Table API decimals can be defined by `BigDecimal("123456")` and in Java by 
appending a "p" for precise e.g. `123456p`.
 
-Temporal intervals can be represented as number of months 
(`Types.INTERVAL_MONTHS`) or number of milliseconds (`Types.INTERVAL_MILLIS`). 
Intervals of same type can be added or subtracted (e.g. `1.hour + 10.minutes`). 
Intervals of milliseconds can be added to time points (e.g. 
`"2016-08-10".toDate + 5.days`).
+**Time representation:** In order to work with temporal values the Table API 
supports Java SQL's Date, Time, and Timestamp types. In the Scala Table API 
literals can be defined by using `java.sql.Date.valueOf("2016-06-27")`, 
`java.sql.Time.valueOf("10:10:42")`, or `java.sql.Timestamp.valueOf("2016-06-27 
10:10:42.123")`. The Java and Scala Table API also support calling 
`"2016-06-27".toDate()`, `"10:10:42".toTime()`, and `"2016-06-27 
10:10:42.123".toTimestamp()` for converting Strings int [...]
 
-Scala expressions use implicit conversions. Therefore, make sure to add the 
wildcard import `org.apache.flink.table.api.scala._` to your programs. In case 
a literal is not treated as an expression, use `.toExpr` such as `3.toExpr` to 
force a literal to be converted.
+**Temporal intervals:** Temporal intervals can be represented as number of 
months (`Types.INTERVAL_MONTHS`) or number of milliseconds 
(`Types.INTERVAL_MILLIS`). Intervals of same type can be added or subtracted 
(e.g. `1.hour + 10.minutes`). Intervals of milliseconds can be added to time 
points (e.g. `"2016-08-10".toDate + 5.days`).
+
+**Scala expressions:** Scala expressions use implicit conversions. Therefore, 
make sure to add the wildcard import `org.apache.flink.table.api.scala._` to 
your programs. In case a literal is not treated as an expression, use `.toExpr` 
such as `3.toExpr` to force a literal to be converted.
 
 {% top %}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index d101717..5e6549d 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -1414,4 +1414,21 @@ object uuid {
   }
 }
 
+/**
+  * Returns a null literal value of a given type.
+  *
+  * e.g. nullOf(Types.INT)
+  */
+object nullOf {
+
+  /**
+    * Returns a null literal value of a given type.
+    *
+    * e.g. nullOf(Types.INT)
+    */
+  def apply(typeInfo: TypeInformation[_]): Expression = {
+    Null(typeInfo)
+  }
+}
+
 // scalastyle:on object.name
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index b45ce80..b502827 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -53,6 +53,7 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   lazy val ASC: Keyword = Keyword("asc")
   lazy val DESC: Keyword = Keyword("desc")
   lazy val NULL: Keyword = Keyword("Null")
+  lazy val NULL_OF: Keyword = Keyword("nullOf")
   lazy val IF: Keyword = Keyword("?")
   lazy val TO_DATE: Keyword = Keyword("toDate")
   lazy val TO_TIME: Keyword = Keyword("toTime")
@@ -207,7 +208,7 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
     str => Literal(str.toBoolean)
   }
 
-  lazy val nullLiteral: PackratParser[Expression] = NULL ~ "(" ~> dataType <~ 
")" ^^ {
+  lazy val nullLiteral: PackratParser[Expression] = (NULL | NULL_OF) ~ "(" ~> 
dataType <~ ")" ^^ {
     dt => Null(dt)
   }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/literals.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/literals.scala
index 24bae90..bf39d99 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/literals.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -122,6 +122,9 @@ case class Literal(value: Any, resultType: 
TypeInformation[_]) extends LeafExpre
   }
 }
 
+@deprecated(
+  "Use nullOf(TypeInformation) instead. It is available through the implicit 
Scala DSL.",
+  "1.8.0")
 case class Null(resultType: TypeInformation[_]) extends LeafExpression {
   override def toString = s"null"
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
index 9226200..f0f1ca3 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
@@ -24,7 +24,6 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.Null
 import org.apache.flink.table.runtime.utils.CommonTestData.NonPojo
 import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.table.utils.TableTestUtil._
@@ -88,7 +87,7 @@ class SetOperatorsTest extends TableTestBase {
 
     val in = t.select('a)
       .unionAll(
-        t.select(('c > 0) ? ('b, Null(createTypeInformation[(Int, String)]))))
+        t.select(('c > 0) ? ('b, nullOf(createTypeInformation[(Int, 
String)]))))
 
     val expected = binaryNode(
       "DataSetUnion",
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index 37d5bc1..8e24413 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -22,7 +22,6 @@ import org.apache.flink.api.scala._
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.calcite.RelTimeIndicatorConverter
-import org.apache.flink.table.expressions.Null
 import org.apache.flink.table.plan.logical.TumblingGroupWindow
 import org.apache.flink.table.runtime.join.WindowJoinUtil
 import org.apache.flink.table.utils.TableTestUtil.{term, _}
@@ -256,7 +255,7 @@ class JoinTest extends TableTestBase {
     val streamUtil: StreamTableTestUtil = streamTestUtil()
 
     val t1 = streamUtil.addTable[(Int, Long, String)]("Table1", 'a, 'b, 'c, 
'proctime.proctime)
-      .select('a, 'b, 'c, 'proctime, Null(Types.LONG) as 'nullField)
+      .select('a, 'b, 'c, 'proctime, nullOf(Types.LONG) as 'nullField)
 
     val t2 = streamUtil.addTable[(Int, Long, String)]("Table2", 'a, 'b, 'c, 
'proctime.proctime)
       .select('a, 'b, 'c, 'proctime, 12L as 'nullField)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
index e0292b2..a7aae1c 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
@@ -54,14 +54,14 @@ class ArrayTypeTest extends ArrayTypeTestBase {
       "[2, 9]")
 
     testAllApis(
-      array(Null(Types.INT), 1),
-      "array(Null(INT), 1)",
+      array(nullOf(Types.INT), 1),
+      "array(nullOf(INT), 1)",
       "ARRAY[NULLIF(1,1), 1]",
       "[null, 1]")
 
     testAllApis(
-      array(array(Null(Types.INT), 1)),
-      "array(array(Null(INT), 1))",
+      array(array(nullOf(Types.INT), 1)),
+      "array(array(nullOf(INT), 1))",
       "ARRAY[ARRAY[NULLIF(1,1), 1]]",
       "[[null, 1]]")
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
index 0a30eb0..56cfd0f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
@@ -54,8 +54,8 @@ class MapTypeTest extends MapTypeTestBase {
       "{2=2, 3=9}")
 
     testAllApis(
-      map(1, Null(Types.INT)),
-      "map(1, Null(INT))",
+      map(1, nullOf(Types.INT)),
+      "map(1, nullOf(INT))",
       "map[1, NULLIF(1,1)]",
       "{1=null}")
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/RowTypeTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/RowTypeTest.scala
index df84a84..7893e05 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/RowTypeTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/RowTypeTest.scala
@@ -52,8 +52,8 @@ class RowTypeTest extends RowTypeTestBase {
       "1985-04-11,0.1,[1, 2, 3],{foo=bar},1,true") // string flatten
 
     testAllApis(
-      row(1 + 1, 2 * 3, Null(Types.STRING)),
-      "row(1 + 1, 2 * 3, Null(STRING))",
+      row(1 + 1, 2 * 3, nullOf(Types.STRING)),
+      "row(1 + 1, 2 * 3, nullOf(STRING))",
       "ROW(1 + 1, 2 * 3, NULLIF(1,1))",
       "2,6,null"
     )
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 799f636..e5e8dc2 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -115,14 +115,14 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "null")
 
     testAllApis(
-      'f0.replace(Null(Types.STRING), ""),
-      "f0.replace(Null(STRING), '')",
+      'f0.replace(nullOf(Types.STRING), ""),
+      "f0.replace(nullOf(STRING), '')",
       "REPLACE(f0, NULLIF('', ''), '')",
       "null")
 
     testAllApis(
-      'f0.replace(" ", Null(Types.STRING)),
-      "f0.replace(' ', Null(STRING))",
+      'f0.replace(" ", nullOf(Types.STRING)),
+      "f0.replace(' ', nullOf(STRING))",
       "REPLACE(f0, ' ', NULLIF('', ''))",
       "null")
   }
@@ -440,8 +440,8 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "2A")
 
     testAllApis(
-      Null(Types.BYTE).hex(),
-      "hex(Null(BYTE))",
+      nullOf(Types.BYTE).hex(),
+      "hex(nullOf(BYTE))",
       "HEX(CAST(NULL AS TINYINT))",
       "null")
 
@@ -529,8 +529,8 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   def testBin(): Unit = {
 
     testAllApis(
-      Null(Types.BYTE).bin(),
-      "bin(Null(BYTE))",
+      nullOf(Types.BYTE).bin(),
+      "bin(nullOf(BYTE))",
       "BIN((CAST(NULL AS TINYINT)))",
       "null")
 
@@ -648,8 +648,8 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
 
     // This test was added for the null literal problem in string expression 
parsing (FLINK-10463).
     testAllApis(
-      Null(Types.STRING).regexpReplace("oo|ar", 'f33),
-      "Null(STRING).regexpReplace('oo|ar', f33)",
+      nullOf(Types.STRING).regexpReplace("oo|ar", 'f33),
+      "nullOf(STRING).regexpReplace('oo|ar', f33)",
       "REGEXP_REPLACE(CAST(NULL AS VARCHAR), 'oo|ar', f33)",
       "null")
   }
@@ -2645,17 +2645,17 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
     }
 
     testAllApis(
-      timestampDiff(TimePointUnit.DAY, Null(Types.SQL_TIMESTAMP),
+      timestampDiff(TimePointUnit.DAY, nullOf(Types.SQL_TIMESTAMP),
         "2016-02-24 12:42:25".toTimestamp),
-      "timestampDiff(DAY, Null(SQL_TIMESTAMP), '2016-02-24 
12:42:25'.toTimestamp)",
+      "timestampDiff(DAY, nullOf(SQL_TIMESTAMP), '2016-02-24 
12:42:25'.toTimestamp)",
       "TIMESTAMPDIFF(DAY, CAST(NULL AS TIMESTAMP), TIMESTAMP '2016-02-24 
12:42:25')",
       "null"
     )
 
     testAllApis(
       timestampDiff(TimePointUnit.DAY, "2016-02-24 12:42:25".toTimestamp,
-        Null(Types.SQL_TIMESTAMP)),
-      "timestampDiff(DAY, '2016-02-24 12:42:25'.toTimestamp,  
Null(SQL_TIMESTAMP))",
+        nullOf(Types.SQL_TIMESTAMP)),
+      "timestampDiff(DAY, '2016-02-24 12:42:25'.toTimestamp,  
nullOf(SQL_TIMESTAMP))",
       "TIMESTAMPDIFF(DAY, TIMESTAMP '2016-02-24 12:42:25',  CAST(NULL AS 
TIMESTAMP))",
       "null"
     )
@@ -2779,20 +2779,20 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
     }
 
     testAllApis(
-      "2016-02-24 12:42:25".toTimestamp + Null(Types.INTERVAL_MILLIS),
-      "'2016-02-24 12:42:25'.toTimestamp + Null(INTERVAL_MILLIS)",
+      "2016-02-24 12:42:25".toTimestamp + nullOf(Types.INTERVAL_MILLIS),
+      "'2016-02-24 12:42:25'.toTimestamp + nullOf(INTERVAL_MILLIS)",
       "TIMESTAMPADD(HOUR, CAST(NULL AS INTEGER), TIMESTAMP '2016-02-24 
12:42:25')",
       "null")
 
     testAllApis(
-      Null(Types.SQL_TIMESTAMP) + -200.hours,
-      "Null(SQL_TIMESTAMP) + -200.hours",
+      nullOf(Types.SQL_TIMESTAMP) + -200.hours,
+      "nullOf(SQL_TIMESTAMP) + -200.hours",
       "TIMESTAMPADD(HOUR, -200, CAST(NULL AS TIMESTAMP))",
       "null")
 
     testAllApis(
-      Null(Types.SQL_TIMESTAMP) + 3.months,
-      "Null(SQL_TIMESTAMP) + 3.months",
+      nullOf(Types.SQL_TIMESTAMP) + 3.months,
+      "nullOf(SQL_TIMESTAMP) + 3.months",
       "TIMESTAMPADD(MONTH, 3, CAST(NULL AS TIMESTAMP))",
       "null")
 
@@ -2827,13 +2827,13 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "timestampadd(SECOND, 1, date '2016-06-15')",
       "2016-06-15 00:00:01.0")
 
-    testAllApis(Null(Types.SQL_TIMESTAMP) + 1.second,
-      "Null(SQL_TIMESTAMP) + 1.second",
+    testAllApis(nullOf(Types.SQL_TIMESTAMP) + 1.second,
+      "nullOf(SQL_TIMESTAMP) + 1.second",
       "timestampadd(SECOND, 1, cast(null as date))",
       "null")
 
-    testAllApis(Null(Types.SQL_TIMESTAMP) + 1.day,
-      "Null(SQL_TIMESTAMP) + 1.day",
+    testAllApis(nullOf(Types.SQL_TIMESTAMP) + 1.day,
+      "nullOf(SQL_TIMESTAMP) + 1.day",
       "timestampadd(DAY, 1, cast(null as date))",
       "null")
 
@@ -2986,8 +2986,8 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "null")
 
     testAllApis(
-      "test".sha2(Null(Types.INT)),
-      "sha2('test', Null(INT))",
+      "test".sha2(nullOf(Types.INT)),
+      "sha2('test', nullOf(INT))",
       "SHA2('test', CAST(NULL AS INT))",
       "null")
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
index d61627b..088ba65 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
@@ -296,8 +296,8 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
     )
 
     testTableApi(
-      'f10.in("This is a test String.", "String", "Hello world", "Comment#1", 
Null(Types.STRING)),
-      "f10.in('This is a test String.', 'String', 'Hello world', 'Comment#1', 
Null(STRING))",
+      'f10.in("This is a test String.", "String", "Hello world", "Comment#1", 
nullOf(Types.STRING)),
+      "f10.in('This is a test String.', 'String', 'Hello world', 'Comment#1', 
nullOf(STRING))",
       "true"
     )
 
@@ -308,8 +308,8 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
     )
 
     testTableApi(
-      'f10.in("FAIL", "FAIL", Null(Types.STRING)),
-      "f10.in('FAIL', 'FAIL', Null(STRING))",
+      'f10.in("FAIL", "FAIL", nullOf(Types.STRING)),
+      "f10.in('FAIL', 'FAIL', nullOf(STRING))",
       "null"
     )
   }
@@ -350,10 +350,10 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase 
{
       "true")
 
     // null
-    testAllApis(Null(Types.INT), "Null(INT)", "CAST(NULL AS INT)", "null")
+    testAllApis(nullOf(Types.INT), "nullOf(INT)", "CAST(NULL AS INT)", "null")
     testAllApis(
-      Null(Types.STRING) === "",
-      "Null(STRING) === ''",
+      nullOf(Types.STRING) === "",
+      "nullOf(STRING) === ''",
       "CAST(NULL AS VARCHAR) = ''",
       "null")
 
@@ -416,26 +416,26 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase 
{
   def testBetween(): Unit = {
     // between
     testAllApis(
-      4.between(Null(Types.INT), 3),
-      "4.between(Null(INT), 3)",
+      4.between(nullOf(Types.INT), 3),
+      "4.between(nullOf(INT), 3)",
       "4 BETWEEN NULL AND 3",
       "false"
     )
     testAllApis(
-      4.between(Null(Types.INT), 12),
-      "4.between(Null(INT), 12)",
+      4.between(nullOf(Types.INT), 12),
+      "4.between(nullOf(INT), 12)",
       "4 BETWEEN NULL AND 12",
       "null"
     )
     testAllApis(
-      4.between(Null(Types.INT), 3),
-      "4.between(Null(INT), 3)",
+      4.between(nullOf(Types.INT), 3),
+      "4.between(nullOf(INT), 3)",
       "4 BETWEEN 5 AND NULL",
       "false"
     )
     testAllApis(
-      4.between(Null(Types.INT), 12),
-      "4.between(Null(INT), 12)",
+      4.between(nullOf(Types.INT), 12),
+      "4.between(nullOf(INT), 12)",
       "4 BETWEEN 0 AND NULL",
       "null"
     )
@@ -490,8 +490,8 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
 
     // not between
     testAllApis(
-      2.notBetween(Null(Types.INT), 3),
-      "2.notBetween(Null(INT), 3)",
+      2.notBetween(nullOf(Types.INT), 3),
+      "2.notBetween(nullOf(INT), 3)",
       "2 NOT BETWEEN NULL AND 3",
       "null"
     )
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
index 1534344..35faa2b 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
@@ -121,26 +121,26 @@ class UserDefinedScalarFunctionTest extends 
ExpressionTestBase {
   @Test
   def testNullableParameters(): Unit = {
     testAllApis(
-      Func3(Null(INT_TYPE_INFO), Null(STRING_TYPE_INFO)),
-      "Func3(Null(INT), Null(STRING))",
+      Func3(nullOf(INT_TYPE_INFO), nullOf(STRING_TYPE_INFO)),
+      "Func3(nullOf(INT), nullOf(STRING))",
       "Func3(NULL, NULL)",
       "null and null")
 
     testAllApis(
-      Func3(Null(INT_TYPE_INFO), "Test"),
-      "Func3(Null(INT), 'Test')",
+      Func3(nullOf(INT_TYPE_INFO), "Test"),
+      "Func3(nullOf(INT), 'Test')",
       "Func3(NULL, 'Test')",
       "null and Test")
 
     testAllApis(
-      Func3(42, Null(STRING_TYPE_INFO)),
-      "Func3(42, Null(STRING))",
+      Func3(42, nullOf(STRING_TYPE_INFO)),
+      "Func3(42, nullOf(STRING))",
       "Func3(42, NULL)",
       "42 and null")
 
     testAllApis(
-      Func0(Null(INT_TYPE_INFO)),
-      "Func0(Null(INT))",
+      Func0(nullOf(INT_TYPE_INFO)),
+      "Func0(nullOf(INT))",
       "Func0(NULL)",
       "-1")
   }
@@ -349,8 +349,8 @@ class UserDefinedScalarFunctionTest extends 
ExpressionTestBase {
       "7591 and 43810000 and 655906210000")
 
     testAllApis(
-      JavaFunc1(Null(Types.SQL_TIME), 15, Null(Types.SQL_TIMESTAMP)),
-      "JavaFunc1(Null(SQL_TIME), 15, Null(SQL_TIMESTAMP))",
+      JavaFunc1(nullOf(Types.SQL_TIME), 15, nullOf(Types.SQL_TIMESTAMP)),
+      "JavaFunc1(nullOf(SQL_TIME), 15, nullOf(SQL_TIMESTAMP))",
       "JavaFunc1(NULL, 15, NULL)",
       "null and 15 and null")
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
index c5787c2..ddaf7fc 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
@@ -25,9 +25,8 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.Types
-import org.apache.flink.table.expressions.Null
+import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, 
StreamingWithStateTestBase}
 import org.apache.flink.types.Row
 import org.junit.Assert.assertEquals
@@ -69,9 +68,9 @@ class JoinITCase extends StreamingWithStateTestBase {
     data2.+=((2, 2L, "HeHe"))
 
     val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 
'proctime.proctime)
-      .select(('a === 1)?(Null(Types.INT), 'a) as 'a, 'b, 'c, 'proctime) // 
test null values
+      .select(('a === 1)?(nullOf(Types.INT), 'a) as 'a, 'b, 'c, 'proctime) // 
test null values
     val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 
'proctime.proctime)
-      .select(('a === 1)?(Null(Types.INT), 'a) as 'a, 'b, 'c, 'proctime) // 
test null values
+      .select(('a === 1)?(nullOf(Types.INT), 'a) as 'a, 'b, 'c, 'proctime) // 
test null values
 
     tEnv.registerTable("T1", t1)
     tEnv.registerTable("T2", t2)
@@ -947,9 +946,9 @@ class JoinITCase extends StreamingWithStateTestBase {
     data2.+=((3, 2L, "HeHe"))
 
     val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c)
-      .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+      .select(('a === 3) ? (nullOf(Types.INT), 'a) as 'a, 'b, 'c)
     val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c)
-      .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+      .select(('a === 3) ? (nullOf(Types.INT), 'a) as 'a, 'b, 'c)
 
     tEnv.registerTable("T1", t1)
     tEnv.registerTable("T2", t2)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
index 3b2268c..c0a3e24 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
@@ -23,10 +23,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.runtime.utils.StreamITCase.RetractingSink
 import org.apache.flink.table.api.{StreamQueryConfig, Types}
-import org.apache.flink.table.expressions.Null
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct,
 DataViewTestAgg, WeightedAvg}
+import org.apache.flink.table.runtime.utils.StreamITCase.RetractingSink
 import org.apache.flink.table.runtime.utils.{JavaUserDefinedAggFunctions, 
StreamITCase, StreamTestData, StreamingWithStateTestBase}
 import org.apache.flink.types.Row
 import org.junit.Assert.assertEquals
@@ -142,7 +141,7 @@ class AggregateITCase extends StreamingWithStateTestBase {
     StreamITCase.clear
 
     val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-      .select('b, Null(Types.LONG)).distinct()
+      .select('b, nullOf(Types.LONG)).distinct()
 
     val results = t.toRetractStream[Row](queryConfig)
     results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
index 4a69ec5..926319f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
@@ -18,20 +18,20 @@
 
 package org.apache.flink.table.runtime.stream.table
 
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.{StreamQueryConfig, Types}
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, 
StreamingWithStateTestBase}
-import org.junit.Assert._
-import org.junit.Test
-import org.apache.flink.api.common.time.Time
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.expressions.Literal
 import org.apache.flink.table.expressions.utils.Func20
-import org.apache.flink.table.expressions.{Literal, Null}
 import org.apache.flink.table.functions.aggfunctions.CountAggFunction
 import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct,
 WeightedAvg}
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, 
StreamingWithStateTestBase}
 import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
 
 import scala.collection.mutable
 
@@ -441,9 +441,9 @@ class JoinITCase extends StreamingWithStateTestBase {
     env.setStateBackend(getStateBackend)
 
     val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-      .select(('a === 21) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+      .select(('a === 21) ? (nullOf(Types.INT), 'a) as 'a, 'b, 'c)
     val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 
'f, 'g, 'h)
-      .select(('e === 15) ? (Null(Types.INT), 'd) as 'd,  'e, 'f, 'g, 'h)
+      .select(('e === 15) ? (nullOf(Types.INT), 'd) as 'd,  'e, 'f, 'g, 'h)
 
     val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
 

Reply via email to