Repository: flink Updated Branches: refs/heads/master 494212b37 -> 50d8797bb
[FLINK-3739] [table] Add a null literal to Table API This closes #1880. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50d8797b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50d8797b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50d8797b Branch: refs/heads/master Commit: 50d8797bba926d14b0873be3972e7f97b306f675 Parents: 494212b Author: twalthr <twal...@apache.org> Authored: Wed Apr 13 12:18:16 2016 +0200 Committer: twalthr <twal...@apache.org> Committed: Fri Apr 15 10:30:12 2016 +0200 ---------------------------------------------------------------------- docs/apis/batch/libs/table.md | 7 +++ .../flink/api/scala/table/expressionDsl.scala | 2 +- .../flink/api/table/codegen/CodeGenerator.scala | 21 ++++--- .../table/expressions/ExpressionParser.scala | 16 ++++- .../flink/api/table/expressions/literals.scala | 12 ++++ .../flink/api/table/expressions/package.scala | 4 +- .../api/table/typeutils/TypeConverter.scala | 4 ++ .../api/java/table/test/ExpressionsITCase.java | 25 ++++++++ .../api/scala/sql/test/ExpressionsITCase.scala | 64 ++++++++++++++++++++ .../flink/api/scala/sql/test/FilterITCase.scala | 1 - .../scala/table/test/ExpressionsITCase.scala | 25 +++++++- 11 files changed, 163 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/docs/apis/batch/libs/table.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/libs/table.md b/docs/apis/batch/libs/table.md index 527d10d..56e2b6b 100644 --- a/docs/apis/batch/libs/table.md +++ b/docs/apis/batch/libs/table.md @@ -560,3 +560,10 @@ val result = tableEnv.sql("SELECT * FROM MyTable") {% top %} +Runtime Configuration +---- +The Table API provides a configuration (the so-called `TableConfig`) to modify runtime behavior. It can be accessed either through `TableEnvironment` or passed to the `toDataSet`/`toDataStream` method when using Scala implicit conversion. + +### Null Handling +By default, the Table API does not support `null` values at runtime for efficiency purposes. Null handling can be enabled by setting the `nullCheck` property in the `TableConfig` to `true`. + http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala index 5aa8a51..505d872 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala @@ -27,7 +27,7 @@ import scala.language.implicitConversions * operations. * * These operations must be kept in sync with the parser in - * [[org.apache.flink.api.table.parser.ExpressionParser]]. + * [[org.apache.flink.api.table.expressions.ExpressionParser]]. */ trait ImplicitExpressionOperations { def expr: Expression http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala index f213d4c..c336c82 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala @@ -521,6 +521,11 @@ class CodeGenerator( override def visitLiteral(literal: RexLiteral): GeneratedExpression = { val resultType = sqlTypeToTypeInfo(literal.getType.getSqlTypeName) val value = literal.getValue3 + // null value with type + if (value == null) { + return generateNullLiteral(resultType) + } + // non-null values literal.getType.getSqlTypeName match { case BOOLEAN => generateNonNullLiteral(resultType, literal.getValue3.toString) @@ -574,8 +579,6 @@ class CodeGenerator( } case VARCHAR | CHAR => generateNonNullLiteral(resultType, "\"" + value.toString + "\"") - case NULL => - generateNullLiteral(resultType) case SYMBOL => val symbolOrdinal = value.asInstanceOf[SqlLiteral.SqlSymbol].ordinal() generateNonNullLiteral(resultType, symbolOrdinal.toString) @@ -742,6 +745,12 @@ class CodeGenerator( } } + override def visitOver(over: RexOver): GeneratedExpression = ??? + + // ---------------------------------------------------------------------------------------------- + // generator helping methods + // ---------------------------------------------------------------------------------------------- + def checkNumericOrString(left: GeneratedExpression, right: GeneratedExpression): Unit = { if (isNumeric(left)) { requireNumeric(right) @@ -750,12 +759,6 @@ class CodeGenerator( } } - override def visitOver(over: RexOver): GeneratedExpression = ??? - - // ---------------------------------------------------------------------------------------------- - // generator helping methods - // ---------------------------------------------------------------------------------------------- - private def generateInputAccess( inputType: TypeInformation[Any], inputTerm: String, @@ -906,7 +909,7 @@ class CodeGenerator( val wrappedCode = if (nullCheck) { s""" - |$resultTypeTerm $resultTerm = null; + |$resultTypeTerm $resultTerm = $defaultValue; |boolean $nullTerm = true; |""".stripMargin } else { http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala index 4c88249..8a24d3c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala @@ -77,6 +77,18 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { str => Literal(str.toBoolean) } + lazy val nullLiteral: PackratParser[Expression] = + "Null(BYTE)" ^^ { e => Null(BasicTypeInfo.BYTE_TYPE_INFO) } | + "Null(SHORT)" ^^ { e => Null(BasicTypeInfo.SHORT_TYPE_INFO) } | + "Null(INT)" ^^ { e => Null(BasicTypeInfo.INT_TYPE_INFO) } | + "Null(LONG)" ^^ { e => Null(BasicTypeInfo.LONG_TYPE_INFO) } | + "Null(FLOAT)" ^^ { e => Null(BasicTypeInfo.FLOAT_TYPE_INFO) } | + "Null(DOUBLE)" ^^ { e => Null(BasicTypeInfo.DOUBLE_TYPE_INFO) } | + "Null(BOOL)" ^^ { e => Null(BasicTypeInfo.BOOLEAN_TYPE_INFO) } | + "Null(BOOLEAN)" ^^ { e => Null(BasicTypeInfo.BOOLEAN_TYPE_INFO) } | + "Null(STRING)" ^^ { e => Null(BasicTypeInfo.STRING_TYPE_INFO) } | + "Null(DATE)" ^^ { e => Null(BasicTypeInfo.DATE_TYPE_INFO) } + lazy val literalExpr: PackratParser[Expression] = numberLiteral | stringLiteralFlink | singleQuoteStringLiteral | @@ -188,8 +200,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val suffix = isNull | isNotNull | - sum | min | max | count | avg | cast | - specialFunctionCalls |functionCall | functionCallWithoutArgs | + sum | min | max | count | avg | cast | nullLiteral | + specialFunctionCalls | functionCall | functionCallWithoutArgs | specialSuffixFunctionCalls | suffixFunctionCall | suffixFunctionCallWithoutArgs | atom http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala index efaa96d..1fbe5a3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala @@ -23,6 +23,7 @@ import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.scala.table.ImplicitExpressionOperations +import org.apache.flink.api.table.typeutils.TypeConverter object Literal { def apply(l: Any): Literal = l match { @@ -49,3 +50,14 @@ case class Literal(value: Any, tpe: TypeInformation[_]) relBuilder.literal(value) } } + +case class Null(tpe: TypeInformation[_]) extends LeafExpression { + def expr = this + def typeInfo = tpe + + override def toString = s"null" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.getRexBuilder.makeNullLiteral(TypeConverter.typeInfoToSqlType(tpe)) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala index c5c8c94..2e5d0b2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala @@ -21,8 +21,8 @@ package org.apache.flink.api.table * This package contains the base class of AST nodes and all the expression language AST classes. * Expression trees should not be manually constructed by users. They are implicitly constructed * from the implicit DSL conversions in - * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and - * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]]. For the Java API, + * [[org.apache.flink.api.scala.table.ImplicitExpressionConversions]] and + * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]]. For the Java API, * expression trees should be generated from a string parser that parses expressions and creates * AST nodes. */ http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala index dc3abb7..02fe21d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TypeConverter.scala @@ -75,6 +75,10 @@ object TypeConverter { case VARCHAR | CHAR => STRING_TYPE_INFO case DATE => DATE_TYPE_INFO + case NULL => + throw new TableException("Type NULL is not supported. " + + "Null values must have a supported type.") + // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING // are represented as integer case SYMBOL => INT_TYPE_INFO http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java index 8c30163..996542d 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java @@ -100,5 +100,30 @@ public class ExpressionsITCase extends TableProgramsTestBase { compareResultAsText(results, expected); } + @Test + public void testNullLiteral() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); + + DataSource<Tuple2<Integer, Integer>> input = + env.fromElements(new Tuple2<>(1, 0)); + + Table table = + tableEnv.fromDataSet(input, "a, b"); + + Table result = table.select("a, b, Null(INT), Null(STRING) === ''"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected; + if (getConfig().getNullCheck()) { + expected = "1,0,null,null"; + } + else { + expected = "1,0,-1,true"; + } + compareResultAsText(results, expected); + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/ExpressionsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/ExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/ExpressionsITCase.scala new file mode 100644 index 0000000..1d72c5d --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/ExpressionsITCase.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.sql.test + +import org.apache.flink.api.scala.{ExecutionEnvironment, _} +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.plan.TranslationContext +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class ExpressionsITCase( + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + @Test + def testNullLiteral(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT a, b, CAST(NULL AS INT), CAST(NULL AS VARCHAR) = '' FROM MyTable" + + val ds = env.fromElements((1, 0)) + tEnv.registerDataSet("MyTable", ds, 'a, 'b) + + val result = tEnv.sql(sqlQuery) + + val expected = if (getConfig.getNullCheck) { + "1,0,null,null" + } else { + "1,0,-1,true" + } + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala index c89e25a..171e200 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala @@ -18,7 +18,6 @@ package org.apache.flink.api.scala.sql.test -import org.apache.calcite.tools.ValidationException import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets http://git-wip-us.apache.org/repos/asf/flink/blob/50d8797b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala index ba0311a..29b3be4 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala @@ -24,10 +24,9 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.table.Row -import org.apache.flink.api.table.expressions.Literal -import org.apache.flink.api.table.test.utils.TableProgramsTestBase -import TableProgramsTestBase.TableConfigMode +import org.apache.flink.api.table.expressions.{Literal, Null} import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils import org.junit._ @@ -91,6 +90,26 @@ class ExpressionsITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Test + def testNullLiteral(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + + val t = env.fromElements((1, 0)).as('a, 'b) + .select( + 'a, + 'b, + Null(BasicTypeInfo.INT_TYPE_INFO), + Null(BasicTypeInfo.STRING_TYPE_INFO) === "") + + val expected = if (getConfig.getNullCheck) { + "1,0,null,null" + } else { + "1,0,-1,true" + } + val results = t.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + // Date literals not yet supported @Ignore @Test