Repository: flink
Updated Branches:
  refs/heads/master 494212b37 -> 50d8797bb


[FLINK-3739] [table] Add a null literal to Table API

This closes #1880.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50d8797b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50d8797b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50d8797b

Branch: refs/heads/master
Commit: 50d8797bba926d14b0873be3972e7f97b306f675
Parents: 494212b
Author: twalthr <twal...@apache.org>
Authored: Wed Apr 13 12:18:16 2016 +0200
Committer: twalthr <twal...@apache.org>
Committed: Fri Apr 15 10:30:12 2016 +0200

----------------------------------------------------------------------
 docs/apis/batch/libs/table.md                   |  7 +++
 .../flink/api/scala/table/expressionDsl.scala   |  2 +-
 .../flink/api/table/codegen/CodeGenerator.scala | 21 ++++---
 .../table/expressions/ExpressionParser.scala    | 16 ++++-
 .../flink/api/table/expressions/literals.scala  | 12 ++++
 .../flink/api/table/expressions/package.scala   |  4 +-
 .../api/table/typeutils/TypeConverter.scala     |  4 ++
 .../api/java/table/test/ExpressionsITCase.java  | 25 ++++++++
 .../api/scala/sql/test/ExpressionsITCase.scala  | 64 ++++++++++++++++++++
 .../flink/api/scala/sql/test/FilterITCase.scala |  1 -
 .../scala/table/test/ExpressionsITCase.scala    | 25 +++++++-
 11 files changed, 163 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/docs/apis/batch/libs/table.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/table.md b/docs/apis/batch/libs/table.md
index 527d10d..56e2b6b 100644
--- a/docs/apis/batch/libs/table.md
+++ b/docs/apis/batch/libs/table.md
@@ -560,3 +560,10 @@ val result = tableEnv.sql("SELECT * FROM MyTable")
 
 {% top %}
 
+Runtime Configuration
+----
+The Table API provides a configuration (the so-called `TableConfig`) to modify 
runtime behavior. It can be accessed either through `TableEnvironment` or 
passed to the `toDataSet`/`toDataStream` method when using Scala implicit 
conversion.
+
+### Null Handling
+By default, the Table API does not support `null` values at runtime for 
efficiency purposes. Null handling can be enabled by setting the `nullCheck` 
property in the `TableConfig` to `true`.
+

http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index 5aa8a51..505d872 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -27,7 +27,7 @@ import scala.language.implicitConversions
  * operations.
  *
  * These operations must be kept in sync with the parser in
- * [[org.apache.flink.api.table.parser.ExpressionParser]].
+ * [[org.apache.flink.api.table.expressions.ExpressionParser]].
  */
 trait ImplicitExpressionOperations {
   def expr: Expression

http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index f213d4c..c336c82 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -521,6 +521,11 @@ class CodeGenerator(
   override def visitLiteral(literal: RexLiteral): GeneratedExpression = {
     val resultType = sqlTypeToTypeInfo(literal.getType.getSqlTypeName)
     val value = literal.getValue3
+    // null value with type
+    if (value == null) {
+      return generateNullLiteral(resultType)
+    }
+    // non-null values
     literal.getType.getSqlTypeName match {
       case BOOLEAN =>
         generateNonNullLiteral(resultType, literal.getValue3.toString)
@@ -574,8 +579,6 @@ class CodeGenerator(
         }
       case VARCHAR | CHAR =>
         generateNonNullLiteral(resultType, "\"" + value.toString + "\"")
-      case NULL =>
-        generateNullLiteral(resultType)
       case SYMBOL =>
         val symbolOrdinal = value.asInstanceOf[SqlLiteral.SqlSymbol].ordinal()
         generateNonNullLiteral(resultType, symbolOrdinal.toString)
@@ -742,6 +745,12 @@ class CodeGenerator(
     }
   }
 
+  override def visitOver(over: RexOver): GeneratedExpression = ???
+
+  // 
----------------------------------------------------------------------------------------------
+  // generator helping methods
+  // 
----------------------------------------------------------------------------------------------
+
   def checkNumericOrString(left: GeneratedExpression, right: 
GeneratedExpression): Unit = {
     if (isNumeric(left)) {
       requireNumeric(right)
@@ -750,12 +759,6 @@ class CodeGenerator(
     }
   }
 
-  override def visitOver(over: RexOver): GeneratedExpression = ???
-
-  // 
----------------------------------------------------------------------------------------------
-  // generator helping methods
-  // 
----------------------------------------------------------------------------------------------
-
   private def generateInputAccess(
       inputType: TypeInformation[Any],
       inputTerm: String,
@@ -906,7 +909,7 @@ class CodeGenerator(
 
     val wrappedCode = if (nullCheck) {
       s"""
-        |$resultTypeTerm $resultTerm = null;
+        |$resultTypeTerm $resultTerm = $defaultValue;
         |boolean $nullTerm = true;
         |""".stripMargin
     } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
index 4c88249..8a24d3c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
@@ -77,6 +77,18 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
     str => Literal(str.toBoolean)
   }
 
+  lazy val nullLiteral: PackratParser[Expression] =
+    "Null(BYTE)" ^^ { e => Null(BasicTypeInfo.BYTE_TYPE_INFO) } |
+    "Null(SHORT)" ^^ { e => Null(BasicTypeInfo.SHORT_TYPE_INFO) } |
+    "Null(INT)" ^^ { e => Null(BasicTypeInfo.INT_TYPE_INFO) } |
+    "Null(LONG)" ^^ { e => Null(BasicTypeInfo.LONG_TYPE_INFO) } |
+    "Null(FLOAT)" ^^ { e => Null(BasicTypeInfo.FLOAT_TYPE_INFO) } |
+    "Null(DOUBLE)" ^^ { e => Null(BasicTypeInfo.DOUBLE_TYPE_INFO) } |
+    "Null(BOOL)" ^^ { e => Null(BasicTypeInfo.BOOLEAN_TYPE_INFO) } |
+    "Null(BOOLEAN)" ^^ { e => Null(BasicTypeInfo.BOOLEAN_TYPE_INFO) } |
+    "Null(STRING)" ^^ { e => Null(BasicTypeInfo.STRING_TYPE_INFO) } |
+    "Null(DATE)" ^^ { e => Null(BasicTypeInfo.DATE_TYPE_INFO) }
+
   lazy val literalExpr: PackratParser[Expression] =
     numberLiteral |
       stringLiteralFlink | singleQuoteStringLiteral |
@@ -188,8 +200,8 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
 
   lazy val suffix =
     isNull | isNotNull |
-      sum | min | max | count | avg | cast |
-      specialFunctionCalls |functionCall | functionCallWithoutArgs |
+      sum | min | max | count | avg | cast | nullLiteral |
+      specialFunctionCalls | functionCall | functionCallWithoutArgs |
       specialSuffixFunctionCalls | suffixFunctionCall | 
suffixFunctionCallWithoutArgs |
       atom
 

http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
index efaa96d..1fbe5a3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
@@ -23,6 +23,7 @@ import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.scala.table.ImplicitExpressionOperations
+import org.apache.flink.api.table.typeutils.TypeConverter
 
 object Literal {
   def apply(l: Any): Literal = l match {
@@ -49,3 +50,14 @@ case class Literal(value: Any, tpe: TypeInformation[_])
     relBuilder.literal(value)
   }
 }
+
+case class Null(tpe: TypeInformation[_]) extends LeafExpression {
+  def expr = this
+  def typeInfo = tpe
+
+  override def toString = s"null"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    
relBuilder.getRexBuilder.makeNullLiteral(TypeConverter.typeInfoToSqlType(tpe))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
index c5c8c94..2e5d0b2 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
@@ -21,8 +21,8 @@ package org.apache.flink.api.table
  * This package contains the base class of AST nodes and all the expression 
language AST classes.
  * Expression trees should not be manually constructed by users. They are 
implicitly constructed
  * from the implicit DSL conversions in
- * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and
- * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]]. 
For the Java API,
+ * [[org.apache.flink.api.scala.table.ImplicitExpressionConversions]] and
+ * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]]. For the 
Java API,
  * expression trees should be generated from a string parser that parses 
expressions and creates
  * AST nodes.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala
index dc3abb7..02fe21d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala
@@ -75,6 +75,10 @@ object TypeConverter {
     case VARCHAR | CHAR => STRING_TYPE_INFO
     case DATE => DATE_TYPE_INFO
 
+    case NULL =>
+      throw new TableException("Type NULL is not supported. " +
+        "Null values must have a supported type.")
+
     // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
     // are represented as integer
     case SYMBOL => INT_TYPE_INFO

http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
index 8c30163..996542d 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
@@ -100,5 +100,30 @@ public class ExpressionsITCase extends 
TableProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
+       @Test
+       public void testNullLiteral() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = getJavaTableEnvironment();
+
+               DataSource<Tuple2<Integer, Integer>> input =
+                               env.fromElements(new Tuple2<>(1, 0));
+
+               Table table =
+                               tableEnv.fromDataSet(input, "a, b");
+
+               Table result = table.select("a, b, Null(INT), Null(STRING) === 
''");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected;
+               if (getConfig().getNullCheck()) {
+                       expected = "1,0,null,null";
+               }
+               else {
+                       expected = "1,0,-1,true";
+               }
+               compareResultAsText(results, expected);
+       }
+
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/ExpressionsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/ExpressionsITCase.scala
new file mode 100644
index 0000000..1d72c5d
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/ExpressionsITCase.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.sql.test
+
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import 
org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class ExpressionsITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testNullLiteral(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = getScalaTableEnvironment
+    TranslationContext.reset()
+
+    val sqlQuery = "SELECT a, b, CAST(NULL AS INT), CAST(NULL AS VARCHAR) = '' 
FROM MyTable"
+
+    val ds = env.fromElements((1, 0))
+    tEnv.registerDataSet("MyTable", ds, 'a, 'b)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = if (getConfig.getNullCheck) {
+      "1,0,null,null"
+    } else {
+      "1,0,-1,true"
+    }
+    val results = result.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala
index c89e25a..171e200 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.scala.sql.test
 
-import org.apache.calcite.tools.ValidationException
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets

http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
index ba0311a..29b3be4 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -24,10 +24,9 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.test.utils.TableProgramsTestBase
-import TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.table.expressions.{Literal, Null}
 import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import 
org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
@@ -91,6 +90,26 @@ class ExpressionsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testNullLiteral(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val t = env.fromElements((1, 0)).as('a, 'b)
+      .select(
+        'a,
+        'b,
+        Null(BasicTypeInfo.INT_TYPE_INFO),
+        Null(BasicTypeInfo.STRING_TYPE_INFO) === "")
+
+    val expected = if (getConfig.getNullCheck) {
+      "1,0,null,null"
+    } else {
+      "1,0,-1,true"
+    }
+    val results = t.toDataSet[Row](getConfig).collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
   // Date literals not yet supported
   @Ignore
   @Test

Reply via email to