Repository: flink Updated Branches: refs/heads/master 78f2a1586 -> dc3337a93
[FLINK-3497] [table] Add IS (NOT) TRUE/IS (NOT) FALSE functions Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dc3337a9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dc3337a9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dc3337a9 Branch: refs/heads/master Commit: dc3337a93b20dfc8dc8cdc0ec8c2bc4843e76a69 Parents: 78f2a15 Author: twalthr <twal...@apache.org> Authored: Fri Sep 2 16:23:04 2016 +0200 Committer: twalthr <twal...@apache.org> Committed: Fri Sep 2 16:30:08 2016 +0200 ---------------------------------------------------------------------- docs/dev/table_api.md | 94 +++++++++++++++++++- .../flink/api/scala/table/expressionDsl.scala | 10 +++ .../flink/api/table/codegen/CodeGenerator.scala | 20 +++++ .../table/codegen/calls/ScalarOperators.scala | 32 +++++++ .../flink/api/table/codegen/generated.scala | 6 ++ .../table/expressions/ExpressionParser.scala | 33 +++---- .../api/table/expressions/comparison.scala | 20 +++++ .../api/table/validate/FunctionCatalog.scala | 6 ++ .../table/expressions/ScalarFunctionsTest.scala | 57 +++++++++++- 9 files changed, 253 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/docs/dev/table_api.md ---------------------------------------------------------------------- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index 543945c..68a2b95 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -963,7 +963,7 @@ unary = [ "!" | "-" ] , composite ; composite = suffixed | atom ; -suffixed = interval | cast | as | aggregation | nullCheck | if | functionCall ; +suffixed = interval | cast | as | aggregation | if | functionCall ; interval = composite , "." , ("year" | "month" | "day" | "hour" | "minute" | "second" | "milli") ; @@ -975,11 +975,9 @@ as = composite , ".as(" , fieldReference , ")" ; aggregation = composite , ( ".sum" | ".min" | ".max" | ".count" | ".avg" ) , [ "()" ] ; -nullCheck = composite , ( ".isNull" | ".isNotNull" ) , [ "()" ] ; - if = composite , ".?(" , expression , "," , expression , ")" ; -functionCall = composite , "." , functionIdentifier , "(" , [ expression , { "," , expression } ] , ")" ; +functionCall = composite , "." , functionIdentifier , [ "(" , [ expression , { "," , expression } ] , ")" ] ; atom = ( "(" , expression , ")" ) | literal | nullLiteral | fieldReference ; @@ -1234,6 +1232,50 @@ Both the Table API and SQL come with a set of built-in scalar functions for data <tr> <td> {% highlight java %} +ANY.isNull +{% endhighlight %} + </td> + <td> + <p>Returns true if the given expression is null.</p> + </td> + </tr> + + <tr> + <td> + {% highlight java %} +ANY.isNotNull +{% endhighlight %} + </td> + <td> + <p>Returns true if the given expression is not null.</p> + </td> + </tr> + + <tr> + <td> + {% highlight java %} +BOOLEAN.isTrue +{% endhighlight %} + </td> + <td> + <p>Returns true if the given boolean expression is true. False otherwise (for null and false).</p> + </td> + </tr> + + <tr> + <td> + {% highlight java %} +BOOLEAN.isFalse +{% endhighlight %} + </td> + <td> + <p>Returns true if given boolean expression is false. False otherwise (for null and true).</p> + </td> + </tr> + + <tr> + <td> + {% highlight java %} NUMERIC.exp() {% endhighlight %} </td> @@ -1498,6 +1540,50 @@ TIMEPOINT.ceil(TIMEINTERVALUNIT) <tr> <td> {% highlight scala %} +ANY.isNull +{% endhighlight %} + </td> + <td> + <p>Returns true if the given expression is null.</p> + </td> + </tr> + + <tr> + <td> + {% highlight scala %} +ANY.isNotNull +{% endhighlight %} + </td> + <td> + <p>Returns true if the given expression is not null.</p> + </td> + </tr> + + <tr> + <td> + {% highlight scala %} +BOOLEAN.isTrue +{% endhighlight %} + </td> + <td> + <p>Returns true if the given boolean expression is true. False otherwise (for null and false).</p> + </td> + </tr> + + <tr> + <td> + {% highlight scala %} +BOOLEAN.isFalse +{% endhighlight %} + </td> + <td> + <p>Returns true if given boolean expression is false. False otherwise (for null and true).</p> + </td> + </tr> + + <tr> + <td> + {% highlight scala %} NUMERIC.exp() {% endhighlight %} </td> http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala index b14ca88..9bfe6c3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala @@ -63,6 +63,16 @@ trait ImplicitExpressionOperations { def isNull = IsNull(expr) def isNotNull = IsNotNull(expr) + /** + * Returns true if given boolean expression is true. False otherwise (for null and false). + */ + def isTrue = IsTrue(expr) + + /** + * Returns true if given boolean expression is false. False otherwise (for null and true). + */ + def isFalse = IsFalse(expr) + def + (other: Expression) = Plus(expr, other) def - (other: Expression) = Minus(expr, other) def / (other: Expression) = Div(expr, other) http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala index 4a3865f..6463ff9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala @@ -797,6 +797,26 @@ class CodeGenerator( case CASE => generateIfElse(nullCheck, operands, resultType) + case IS_TRUE => + val operand = operands.head + requireBoolean(operand) + generateIsTrue(operand) + + case IS_NOT_TRUE => + val operand = operands.head + requireBoolean(operand) + generateIsNotTrue(operand) + + case IS_FALSE => + val operand = operands.head + requireBoolean(operand) + generateIsFalse(operand) + + case IS_NOT_FALSE => + val operand = operands.head + requireBoolean(operand) + generateIsNotFalse(operand) + // casting case CAST | REINTERPRET => val operand = operands.head http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala index afe69ed..094a224 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala @@ -384,6 +384,38 @@ object ScalarOperators { } } + def generateIsTrue(operand: GeneratedExpression): GeneratedExpression = { + GeneratedExpression( + operand.resultTerm, // unknown is always false by default + GeneratedExpression.NEVER_NULL, + operand.code, + BOOLEAN_TYPE_INFO) + } + + def generateIsNotTrue(operand: GeneratedExpression): GeneratedExpression = { + GeneratedExpression( + s"(!${operand.resultTerm})", // unknown is always false by default + GeneratedExpression.NEVER_NULL, + operand.code, + BOOLEAN_TYPE_INFO) + } + + def generateIsFalse(operand: GeneratedExpression): GeneratedExpression = { + GeneratedExpression( + s"(!${operand.resultTerm} && !${operand.nullTerm})", + GeneratedExpression.NEVER_NULL, + operand.code, + BOOLEAN_TYPE_INFO) + } + + def generateIsNotFalse(operand: GeneratedExpression): GeneratedExpression = { + GeneratedExpression( + s"(${operand.resultTerm} || ${operand.nullTerm})", + GeneratedExpression.NEVER_NULL, + operand.code, + BOOLEAN_TYPE_INFO) + } + def generateCast( nullCheck: Boolean, operand: GeneratedExpression, http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala index 26c6696..bb52ad8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala @@ -34,4 +34,10 @@ case class GeneratedExpression( code: String, resultType: TypeInformation[_]) +object GeneratedExpression { + val ALWAYS_NULL = "true" + val NEVER_NULL = "false" + val NO_CODE = "" +} + case class GeneratedFunction[T](name: String, returnType: TypeInformation[Any], code: String) http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala index cb92573..ae027e9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala @@ -54,8 +54,6 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val MIN: Keyword = Keyword("min") lazy val MAX: Keyword = Keyword("max") lazy val SUM: Keyword = Keyword("sum") - lazy val IS_NULL: Keyword = Keyword("isNull") - lazy val IS_NOT_NULL: Keyword = Keyword("isNotNull") lazy val CAST: Keyword = Keyword("cast") lazy val NULL: Keyword = Keyword("Null") lazy val IF: Keyword = Keyword("?") @@ -79,7 +77,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { def functionIdent: ExpressionParser.Parser[String] = not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~ - not(SUM) ~ not(IS_NULL) ~ not(IS_NOT_NULL) ~ not(CAST) ~ not(NULL) ~ + not(SUM) ~ not(CAST) ~ not(NULL) ~ not(IF) ~> super.ident // symbols @@ -169,12 +167,6 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { // suffix operators - lazy val suffixIsNull: PackratParser[Expression] = - composite <~ "." ~ IS_NULL ~ opt("()") ^^ { e => IsNull(e) } - - lazy val suffixIsNotNull: PackratParser[Expression] = - composite <~ "." ~ IS_NOT_NULL ~ opt("()") ^^ { e => IsNotNull(e) } - lazy val suffixSum: PackratParser[Expression] = composite <~ "." ~ SUM ~ opt("()") ^^ { e => Sum(e) } @@ -230,6 +222,10 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args) } + lazy val suffixFunctionCallOneArg = composite ~ "." ~ functionIdent ^^ { + case operand ~ _ ~ name => Call(name.toUpperCase, Seq(operand)) + } + lazy val suffixAsc : PackratParser[Expression] = atom <~ "." ~ ASC ~ opt("()") ^^ { e => Asc(e) } @@ -264,20 +260,14 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { } lazy val suffixed: PackratParser[Expression] = - suffixTimeInterval | suffixIsNull | suffixIsNotNull | suffixSum | suffixMin | suffixMax | + suffixTimeInterval | suffixSum | suffixMin | suffixMax | suffixCount | suffixAvg | suffixCast | suffixAs | suffixTrim | suffixTrimWithoutArgs | suffixIf | suffixAsc | suffixDesc | suffixToDate | suffixToTimestamp | suffixToTime | suffixExtract | suffixFloor | suffixCeil | - suffixFunctionCall // function call must always be at the end + suffixFunctionCall | suffixFunctionCallOneArg // function call must always be at the end // prefix operators - lazy val prefixIsNull: PackratParser[Expression] = - IS_NULL ~ "(" ~> expression <~ ")" ^^ { e => IsNull(e) } - - lazy val prefixIsNotNull: PackratParser[Expression] = - IS_NOT_NULL ~ "(" ~> expression <~ ")" ^^ { e => IsNotNull(e) } - lazy val prefixSum: PackratParser[Expression] = SUM ~ "(" ~> expression <~ ")" ^^ { e => Sum(e) } @@ -312,6 +302,10 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { case name ~ _ ~ args ~ _ => Call(name.toUpperCase, args) } + lazy val prefixFunctionCallOneArg = functionIdent ~ "(" ~ expression ~ ")" ^^ { + case name ~ _ ~ arg ~ _ => Call(name.toUpperCase, Seq(arg)) + } + lazy val prefixTrim = TRIM ~ "(" ~ trimMode ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ { case _ ~ _ ~ mode ~ _ ~ trimCharacter ~ _ ~ operand ~ _ => Trim(mode, trimCharacter, operand) } @@ -333,9 +327,10 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { } lazy val prefixed: PackratParser[Expression] = - prefixIsNull | prefixIsNotNull | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg | + prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg | prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf | prefixExtract | - prefixFloor | prefixCeil | prefixFunctionCall // function call must always be at the end + prefixFloor | prefixCeil | + prefixFunctionCall | prefixFunctionCallOneArg // function call must always be at the end // suffix/prefix composite http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala index 0acfbf1..ad01674 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala @@ -121,3 +121,23 @@ case class IsNotNull(child: Expression) extends UnaryExpression { override private[flink] def resultType = BOOLEAN_TYPE_INFO } + +case class IsTrue(child: Expression) extends UnaryExpression { + override def toString = s"($child).isTrue" + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(SqlStdOperatorTable.IS_TRUE, child.toRexNode) + } + + override private[flink] def resultType = BOOLEAN_TYPE_INFO +} + +case class IsFalse(child: Expression) extends UnaryExpression { + override def toString = s"($child).isFalse" + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder.call(SqlStdOperatorTable.IS_FALSE, child.toRexNode) + } + + override private[flink] def resultType = BOOLEAN_TYPE_INFO +} http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala index fb38dde..b9a3f71 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala @@ -117,6 +117,12 @@ class FunctionCatalog { object FunctionCatalog { val buildInFunctions: Map[String, Class[_]] = Map( + // logic + "isNull" -> classOf[IsNull], + "isNotNull" -> classOf[IsNotNull], + "isTrue" -> classOf[IsTrue], + "isFalse" -> classOf[IsFalse], + // aggregate functions "avg" -> classOf[Avg], "count" -> classOf[Count], http://git-wip-us.apache.org/repos/asf/flink/blob/dc3337a9/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala index 7162a04..7ab0c7d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala @@ -702,10 +702,61 @@ class ScalarFunctionsTest extends ExpressionTestBase { "1996-11-01") } + @Test + def testIsTrueIsFalse(): Unit = { + testAllApis( + 'f1.isTrue, + "f1.isTrue", + "f1 IS TRUE", + "true") + + testAllApis( + 'f21.isTrue, + "f21.isTrue", + "f21 IS TRUE", + "false") + + testAllApis( + false.isFalse, + "false.isFalse", + "FALSE IS FALSE", + "true") + + testAllApis( + 'f21.isFalse, + "f21.isFalse", + "f21 IS FALSE", + "false") + + testAllApis( + !'f1.isTrue, + "!f1.isTrue", + "f1 IS NOT TRUE", + "false") + + testAllApis( + !'f21.isTrue, + "!f21.isTrue", + "f21 IS NOT TRUE", + "true") + + testAllApis( + !false.isFalse, + "!false.isFalse", + "FALSE IS NOT FALSE", + "false") + + testAllApis( + !'f21.isFalse, + "!f21.isFalse", + "f21 IS NOT FALSE", + "true") + } + // ---------------------------------------------------------------------------------------------- def testData = { - val testData = new Row(21) + val testData = new Row(22) testData.setField(0, "This is a test String.") testData.setField(1, true) testData.setField(2, 42.toByte) @@ -727,6 +778,7 @@ class ScalarFunctionsTest extends ExpressionTestBase { testData.setField(18, Timestamp.valueOf("1996-11-10 06:55:44.333")) testData.setField(19, 1467012213000L) // +16979 07:23:33.000 testData.setField(20, 25) // +2-01 + testData.setField(21, null) testData } @@ -752,6 +804,7 @@ class ScalarFunctionsTest extends ExpressionTestBase { Types.TIME, Types.TIMESTAMP, Types.INTERVAL_MILLIS, - Types.INTERVAL_MONTHS)).asInstanceOf[TypeInformation[Any]] + Types.INTERVAL_MONTHS, + Types.BOOLEAN)).asInstanceOf[TypeInformation[Any]] } }