[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user NickolayVasilishin closed the pull request at: https://github.com/apache/flink/pull/2870 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user DmytroShkvyra commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r105244663 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala --- @@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase { "true") } + @Test + def testInExpressions(): Unit = { +testTableApi( --- End diff -- @twalthr Unfortunately, add support for POJO and tuples as a standard types of data in table API is too complicated task and out of scope this task. Adding of new composite types to API more wide than add IN operator, it will impact all process of validation and execution of SQL statements. It would be better create epic for introduce new types. I have researched it and I think I would be better commit IN operator functionality without support tuples and POJOs. Otherwise, It would become infinite task. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user DmytroShkvyra commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r105241841 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -150,7 +150,35 @@ class Table( * }}} */ def filter(predicate: Expression): Table = { -new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv)) + +predicate match { --- End diff -- @twalthr I have added tests for this case in new PR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user DmytroShkvyra commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r105238764 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala --- @@ -870,6 +870,16 @@ class CodeGenerator( requireTimeInterval(operand) generateUnaryIntervalPlusMinus(plus = true, nullCheck, operand) + case IN => +val left = operands.head +val right = operands.tail +val addReusableCodeCallback = (declaration: String, initialization: String) => { --- End diff -- @twalthr `generateIn` can executed long enough. This solution works fine --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r104393386 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala --- @@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase { "true") } + @Test + def testInExpressions(): Unit = { +testTableApi( --- End diff -- Yes, we can also create a follow-up issue for that. But since we are not only building a SQL API but also a Table API, POJOs will occur very frequently and everywhere. We should support them as good as we can. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user DmytroShkvyra commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r104389045 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala --- @@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase { "true") } + @Test + def testInExpressions(): Unit = { +testTableApi( --- End diff -- @twalthr Let's create another Jira for support POJOs in IN operator. To be honest, I think this is not necessary because we can use something like this: `SELECT a, b FROM T WHERE a.prop1.IN[1,2,3] AND a.prop2.IN[4,5,6] AND ` I don't remember any case when in standard SQL in `IN` we can use something else than primitives and their wrappers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r104387054 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala --- @@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase { "true") } + @Test + def testInExpressions(): Unit = { +testTableApi( --- End diff -- All types of the right side of the operator have still only one type information. We would only support POJOs that implement `Comparable`. We could use a set for simple types and an usual array list for POJOs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user DmytroShkvyra commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r104379120 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala --- @@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase { "true") } + @Test + def testInExpressions(): Unit = { +testTableApi( --- End diff -- @twalthr It's a big problem. Even two problems: 1. We mention that that all types (right) in IN operator have the same type because we use Set as collection. And we check if right operand has only one `TypeInfo`. Otherwise we should check each item one by one or use something like bloom filter 2. Problem with compilation of expression - each field is compiling to more simple type (the most big problem) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r104377221 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -150,7 +150,35 @@ class Table( * }}} */ def filter(predicate: Expression): Table = { -new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv)) + +predicate match { --- End diff -- As far as I know `IN` is a general expression, so it can be used where ever expressions are allowed. I quickly checked it in MySQL and `SELECT field1 IN ( 22, 23 ) FROM Table1` is not a problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r104375934 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala --- @@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase { "true") } + @Test + def testInExpressions(): Unit = { +testTableApi( --- End diff -- You are right, working with POJOs and Case classes does not always give you best performance but limiting the number of supported types results in bad user experience. Types that are supported in general, should also be supported by features. Why can't we use POJOs in SQL? Is `f4 IN (f3, f2, f0)` not possible? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user DmytroShkvyra commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r103987812 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala --- @@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase { "true") } + @Test + def testInExpressions(): Unit = { +testTableApi( --- End diff -- @twalthr Are you sure? That we need use IN with POJOs/Tuples/Case classes? First of all I will hit performance because compare of these types too complicated. it's easier get subset of id's and use IN than compare POJOs and Case classes. More over we cant it use in SQL string statements (we need parse POJO and case classes from string) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user DmytroShkvyra commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r103976547 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -150,7 +150,35 @@ class Table( * }}} */ def filter(predicate: Expression): Table = { -new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv)) + +predicate match { --- End diff -- Did you mean something like that: `SELECT b.IN[1,2,3] as a FROM T as b` ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r103963098 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala --- @@ -79,6 +79,72 @@ object ScalarOperators { } } + def generateIn( + nullCheck: Boolean, + left: GeneratedExpression, + right: scala.collection.mutable.Buffer[GeneratedExpression], + addReusableCodeCallback: (String, String) => Any) +: GeneratedExpression = { +val resultTerm = newName("result") +val isNull = newName("isNull") + +val topNumericalType: Option[TypeInformation[_]] = { --- End diff -- IMHO, calculating the top numerical type seems not too specific to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r103963525 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala --- @@ -1002,12 +1068,17 @@ object ScalarOperators { val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) // no casting necessary if (operandType == resultType) { - (operandTerm) => s"$operandTerm" + if (isDecimal(operandType)) { +(operandTerm) => s"$operandTerm.stripTrailingZeros()" --- End diff -- Ok, I'm fine with this change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r103962015 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -150,7 +150,35 @@ class Table( * }}} */ def filter(predicate: Expression): Table = { -new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv)) + +predicate match { --- End diff -- The IN operator can also be used in a `select()` statement but if we do the translation here, only `filter()` is supported. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user DmytroShkvyra commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r103746147 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -150,7 +150,35 @@ class Table( * }}} */ def filter(predicate: Expression): Table = { -new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv)) + +predicate match { --- End diff -- What did you mean? Please clarify. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user DmytroShkvyra commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r103742289 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala --- @@ -79,6 +79,72 @@ object ScalarOperators { } } + def generateIn( + nullCheck: Boolean, + left: GeneratedExpression, + right: scala.collection.mutable.Buffer[GeneratedExpression], + addReusableCodeCallback: (String, String) => Any) +: GeneratedExpression = { +val resultTerm = newName("result") +val isNull = newName("isNull") + +val topNumericalType: Option[TypeInformation[_]] = { --- End diff -- @twalthr I think it is not necessary because function will has too specific input params --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user DmytroShkvyra commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r103736499 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala --- @@ -1002,12 +1068,17 @@ object ScalarOperators { val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) // no casting necessary if (operandType == resultType) { - (operandTerm) => s"$operandTerm" + if (isDecimal(operandType)) { +(operandTerm) => s"$operandTerm.stripTrailingZeros()" + } else { +(operandTerm) => s"$operandTerm" + } } // result type is decimal but numeric operand is not else if (isDecimal(resultType) && !isDecimal(operandType) && isNumeric(operandType)) { (operandTerm) => -s"java.math.BigDecimal.valueOf((${superPrimitive(operandType)}) $operandTerm)" +s"java.math.BigDecimal.valueOf((${superPrimitive(operandType)}) $operandTerm)" + --- End diff -- @twalthr See my comment above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user DmytroShkvyra commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r103734557 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala --- @@ -1002,12 +1068,17 @@ object ScalarOperators { val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) // no casting necessary if (operandType == resultType) { - (operandTerm) => s"$operandTerm" + if (isDecimal(operandType)) { +(operandTerm) => s"$operandTerm.stripTrailingZeros()" --- End diff -- @twalthr I continue work of @NickolayVasilishin and found In 5WD-01-Framework-2003-09.pdf (SQL:2003 standard) following text: > There are two classes of numeric type: exact numeric, which includes integer types and types with specified precision and scale; and approximate numeric, which is essentially floating point, and for which a precision may optionally be specified. Every number has a precision (number of digits), and exact numeric types also have a scale (digits after the radix point). Arithmetic operations may be performed on operands of different or the same numeric type, and the result is of a numeric type that depends only on the numeric type of the operands. If the result cannot be represented exactly in the result type, then whether it is rounded or truncated is implementation-defined. An exception condition is raised if the result is outside the range of numeric values of the result type, or if the arithmetic operation is not defined for the operands. can has different and different scale, precision and rounding modes. I think, in case of we have to strip trailing zeros to avoid possible problem with parsing of value from strings BTW, standard do not says that 2.00 can be unequal to 2 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r94806152 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala --- @@ -870,6 +870,16 @@ class CodeGenerator( requireTimeInterval(operand) generateUnaryIntervalPlusMinus(plus = true, nullCheck, operand) + case IN => +val left = operands.head +val right = operands.tail +val addReusableCodeCallback = (declaration: String, initialization: String) => { --- End diff -- Why do we need this callback here? Couldn't we just call those methods from the `generateIn` method? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r94801864 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala --- @@ -110,6 +111,9 @@ trait ImplicitExpressionOperations { def asc = Asc(expr) def desc = Desc(expr) + + def in(subquery: Expression*) = In(expr, subquery) + def in(table: Table) = InSub(expr, table) --- End diff -- Documentation is missing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r94802194 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/In.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.SqlOperator +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.typeutils.TypeCheckUtils._ +import org.apache.flink.api.table.validate.{ValidationResult, ValidationFailure, ValidationSuccess} + +case class In(expression: Expression, subquery: Seq[Expression]) extends Expression { --- End diff -- I think we can put `In` and `InSub` in a common file `in.scala`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r94923365 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -150,7 +150,35 @@ class Table( * }}} */ def filter(predicate: Expression): Table = { -new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv)) + +predicate match { --- End diff -- This is the wrong position for doing the translation as `in` would not work in select statements etc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r94804605 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/In.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.SqlOperator +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.typeutils.TypeCheckUtils._ +import org.apache.flink.api.table.validate.{ValidationResult, ValidationFailure, ValidationSuccess} + +case class In(expression: Expression, subquery: Seq[Expression]) extends Expression { + + def this(expressions: Seq[Expression]) = this(expressions.head, expressions.tail) + + override def toString = s"$expression.in(${subquery.mkString(", ")})" + + /** +* List of child nodes that should be considered when doing transformations. Other values +* in the Product will not be transformed, only handed through. +*/ + override private[flink] def children: Seq[Expression] = expression +: subquery.distinct + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { +relBuilder.call(SqlStdOperatorTable.IN, children.map(_.toRexNode): _*) + } + + override private[flink] def validateInput(): ValidationResult = { +if (children.tail.contains(null)) { + ValidationFailure("Operands on right side of IN operator must be not null") +} else { + val types = children.tail.map(_.resultType) + if (types.distinct.length != 1) { +ValidationFailure( + s"Types on the right side of IN operator must be the same, got ${types.mkString(", ")}." +) + } else { +(children.head.resultType, children.tail.head.resultType) match { --- End diff -- `children.last` instead of `children.tail.head` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r94808082 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala --- @@ -79,6 +79,72 @@ object ScalarOperators { } } + def generateIn( + nullCheck: Boolean, + left: GeneratedExpression, --- End diff -- Could you come up with a name for the parameters `left` and `right` (e.g. `needle`, `haystack`)? It is difficult to read otherwise; also at other locations in the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r94803910 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/In.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.SqlOperator +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.typeutils.TypeCheckUtils._ +import org.apache.flink.api.table.validate.{ValidationResult, ValidationFailure, ValidationSuccess} + +case class In(expression: Expression, subquery: Seq[Expression]) extends Expression { + + def this(expressions: Seq[Expression]) = this(expressions.head, expressions.tail) + + override def toString = s"$expression.in(${subquery.mkString(", ")})" + + /** +* List of child nodes that should be considered when doing transformations. Other values +* in the Product will not be transformed, only handed through. +*/ + override private[flink] def children: Seq[Expression] = expression +: subquery.distinct + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { +relBuilder.call(SqlStdOperatorTable.IN, children.map(_.toRexNode): _*) + } + + override private[flink] def validateInput(): ValidationResult = { +if (children.tail.contains(null)) { + ValidationFailure("Operands on right side of IN operator must be not null") +} else { + val types = children.tail.map(_.resultType) + if (types.distinct.length != 1) { +ValidationFailure( + s"Types on the right side of IN operator must be the same, got ${types.mkString(", ")}." +) + } else { +(children.head.resultType, children.tail.head.resultType) match { + case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess + case (lType, rType) if isComparable(lType) && lType == rType => ValidationSuccess + case (lType, rType) => +ValidationFailure( + s"Types on the both side of IN operator must be the same, got $lType and $rType" --- End diff -- "Types on both sides of the IN operator must be numeric or of the same comparable type" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r94922812 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/InITCase.scala --- @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.batch.table + + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.api.table.{Row, Table, TableEnvironment, ValidationException} +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class InITCase( +mode: TestExecutionMode, +configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + + @Test + def testSqlInOperator(): Unit = { --- End diff -- Can you convert this ITCases into tests extending `TableTestBase`. We try to reduce our build time. It is sufficient to test only the optimized plan. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r94807323 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala --- @@ -79,6 +79,72 @@ object ScalarOperators { } } + def generateIn( + nullCheck: Boolean, + left: GeneratedExpression, + right: scala.collection.mutable.Buffer[GeneratedExpression], + addReusableCodeCallback: (String, String) => Any) +: GeneratedExpression = { +val resultTerm = newName("result") +val isNull = newName("isNull") + +val topNumericalType: Option[TypeInformation[_]] = { --- End diff -- You can extract this logic into a separate method (e.g. in `TypeCheckUtils`). Because it might be useful at other locations too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r94801637 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala --- @@ -110,6 +111,9 @@ trait ImplicitExpressionOperations { def asc = Asc(expr) def desc = Desc(expr) + + def in(subquery: Expression*) = In(expr, subquery) --- End diff -- Documentation is missing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r94804831 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/In.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.SqlOperator +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.typeutils.TypeCheckUtils._ +import org.apache.flink.api.table.validate.{ValidationResult, ValidationFailure, ValidationSuccess} + +case class In(expression: Expression, subquery: Seq[Expression]) extends Expression { + + def this(expressions: Seq[Expression]) = this(expressions.head, expressions.tail) + + override def toString = s"$expression.in(${subquery.mkString(", ")})" + + /** +* List of child nodes that should be considered when doing transformations. Other values +* in the Product will not be transformed, only handed through. +*/ + override private[flink] def children: Seq[Expression] = expression +: subquery.distinct + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { +relBuilder.call(SqlStdOperatorTable.IN, children.map(_.toRexNode): _*) + } + + override private[flink] def validateInput(): ValidationResult = { +if (children.tail.contains(null)) { + ValidationFailure("Operands on right side of IN operator must be not null") +} else { + val types = children.tail.map(_.resultType) + if (types.distinct.length != 1) { +ValidationFailure( + s"Types on the right side of IN operator must be the same, got ${types.mkString(", ")}." +) + } else { +(children.head.resultType, children.tail.head.resultType) match { + case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess + case (lType, rType) if isComparable(lType) && lType == rType => ValidationSuccess + case (lType, rType) => +ValidationFailure( + s"Types on the both side of IN operator must be the same, got $lType and $rType" +) +} + } + +} + } + + /** --- End diff -- Same here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r94805002 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InSub.scala --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.Table + +/** --- End diff -- Please remove all auto-generated comments in this class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r94921085 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala --- @@ -1002,12 +1068,17 @@ object ScalarOperators { val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) // no casting necessary if (operandType == resultType) { - (operandTerm) => s"$operandTerm" + if (isDecimal(operandType)) { +(operandTerm) => s"$operandTerm.stripTrailingZeros()" --- End diff -- We should not modify decimals here. What does the SQL standard says about this? Is 2.00 and 2 equal in SQL? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r94922269 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala --- @@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase { "true") } + @Test + def testInExpressions(): Unit = { +testTableApi( --- End diff -- Can you add some more tests? E.g. - `Null(Types.INT).in(1,2,42, Null(Types.INT))` cannot be compiled. - Date/Time/Timestamp types which are represented as int/long internally. - POJOs/Tuples/Case classes - Fields in the set `f2.in(f3, f4, 4)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r94922579 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InSub.scala --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.Table + +/** + * Created by hadoop on 11/17/16. + */ +case class InSub(expression: Expression, table: Table) extends Expression { +override def toString = s"$expression.in($table})" --- End diff -- Additional `}` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r94804779 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/In.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.SqlOperator +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.typeutils.TypeCheckUtils._ +import org.apache.flink.api.table.validate.{ValidationResult, ValidationFailure, ValidationSuccess} + +case class In(expression: Expression, subquery: Seq[Expression]) extends Expression { + + def this(expressions: Seq[Expression]) = this(expressions.head, expressions.tail) + + override def toString = s"$expression.in(${subquery.mkString(", ")})" + + /** --- End diff -- Can you remove those auto-generated comments? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r94921693 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala --- @@ -1002,12 +1068,17 @@ object ScalarOperators { val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) // no casting necessary if (operandType == resultType) { - (operandTerm) => s"$operandTerm" + if (isDecimal(operandType)) { +(operandTerm) => s"$operandTerm.stripTrailingZeros()" + } else { +(operandTerm) => s"$operandTerm" + } } // result type is decimal but numeric operand is not else if (isDecimal(resultType) && !isDecimal(operandType) && isNumeric(operandType)) { (operandTerm) => -s"java.math.BigDecimal.valueOf((${superPrimitive(operandType)}) $operandTerm)" +s"java.math.BigDecimal.valueOf((${superPrimitive(operandType)}) $operandTerm)" + --- End diff -- If I remove this change no test fails. Is it necessary? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r90497004 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala --- @@ -78,6 +78,69 @@ object ScalarOperators { } } + def generateIn( + nullCheck: Boolean, + left: GeneratedExpression, + right: scala.collection.mutable.Buffer[GeneratedExpression], + addReusableCodeCallback: (String, String) => Any) +: GeneratedExpression = { +val resultTerm = newName("result") +val isNull = newName("isNull") + +val castNumeric = if (isNumeric(left.resultType) || isNumeric(right.head.resultType)) { + (value: String) => s"""new java.math.BigDecimal("" + $value).stripTrailingZeros()""" +} else { + (value: String) => value +} + +val valuesInitialization = if (right.size >= 20) { --- End diff -- I think we can always use the `HashSet` approach. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r90495217 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/In.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.SqlOperator +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.typeutils.TypeCheckUtils._ +import org.apache.flink.api.table.validate.{ValidationResult, ValidationFailure, ValidationSuccess} + +case class In(expressions: Seq[Expression]) extends Expression { + override def toString = s"${expressions.head}.in(${expressions.tail.mkString(", ")})" + + /** +* List of child nodes that should be considered when doing transformations. Other values +* in the Product will not be transformed, only handed through. +*/ + override private[flink] def children: Seq[Expression] = expressions + + private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.IN + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { +relBuilder.call(SqlStdOperatorTable.IN, children.map(_.toRexNode): _*) --- End diff -- Can we add a deduplication step for the literal values before translating them? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r90499247 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/InITCase.java --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.batch.table; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.BatchTableEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.TableEnvironment; +import org.apache.flink.api.table.ValidationException; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.TestBaseUtils; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +@RunWith(Parameterized.class) +public class InITCase extends TableProgramsTestBase { --- End diff -- These `ITCases` are very expensive and significantly blow up the build time. We test expressions like `IN` using the `ExpressionTestBase`. Please see `ScalarFunctionsTest` to check how it works and move all tests to a test class that extends `ExpressionTestBase`. We would only need to check for Scala and Java Table API because `IN`s in SQL queries are differently executed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2870#discussion_r90496100 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala --- @@ -78,6 +78,69 @@ object ScalarOperators { } } + def generateIn( + nullCheck: Boolean, + left: GeneratedExpression, + right: scala.collection.mutable.Buffer[GeneratedExpression], + addReusableCodeCallback: (String, String) => Any) +: GeneratedExpression = { +val resultTerm = newName("result") +val isNull = newName("isNull") + +val castNumeric = if (isNumeric(left.resultType) || isNumeric(right.head.resultType)) { + (value: String) => s"""new java.math.BigDecimal("" + $value).stripTrailingZeros()""" --- End diff -- Casting all numeric values to `BigInteger` is quite expensive. Please check how numeric casting is done for binary operations, e.g., in `generateArithmeticOperator()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator
GitHub user NickolayVasilishin opened a pull request: https://github.com/apache/flink/pull/2870 [FLINK-4565] Support for SQL IN operator [FLINK-4565] Support for SQL IN operator This PR is a part of work on SQL IN operator in Table API, which implements IN for literals. Two cases are covered: less and great then 20 literals. Also I have some questions: - converting all numeric types to BigDecimal isn't ok? I decided to make so to simplify use of hashset. - validation isn't really good. It forces to use operator with same type literals. Should I rework it or maybe just add more cases? expressionDsl.scala: entry point for IN operator in scala API ScalarOperators.scala: 1) All numeric types are upcasting to BigDecimal for using in hashset, other types are unchanged in castNumeric 2) valuesInitialization used for 2 cases: when we have more then 20 operands (then we use hashset, initialized in constructor, descibed below) and less then 20 operands (then we initialize operands in method's body and use them in conjunction) 3) comparison also covers described above cases. In first case we use callback to declare and initialize hashset with all operands. Otherwise we just put all operands in conjunction. 4) Final code is built up with these code snippets. CodeGenerator.scala: passes arguments and callback to declare and init hashet FunctionCatalog.scala: registers "in" as method InITCase: some use cases You can merge this pull request into a Git repository by running: $ git pull https://github.com/NickolayVasilishin/flink FLINK-4565 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2870.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2870 commit e495f96c0016f53195398d78f757857eb8546917 Author: nikolay_vasilishin Date: 2016-11-25T14:18:20Z [FLINK-4565] Support for SQL IN operator expressionDsl.scala: entry point for IN operator in scala API ScalarOperators.scala: 1) All numeric types are upcasting to BigDecimal for using in hashset, other types are unchanged in castNumeric 2) valuesInitialization used for 2 cases: when we have more then 20 operands (then we use hashset, initialized in constructor, descibed below) and less then 20 operands (then we initialize operands in method's body and use them in conjunction) 3) comparison also covers described above cases. In first case we use callback to declare and initialize hashset with all operands. Otherwise we just put all operands in conjunction. 4) Final code is built up with these code snippets. CodeGenerator.scala: passes arguments and callback to declare and init hashet FunctionCatalog.scala: registers "in" as method InITCase: some use cases --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---