[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-04-05 Thread NickolayVasilishin
Github user NickolayVasilishin closed the pull request at:

https://github.com/apache/flink/pull/2870


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---



[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-03-09 Thread DmytroShkvyra
Github user DmytroShkvyra commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r105244663
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase 
{
   "true")
   }
 
+  @Test
+  def testInExpressions(): Unit = {
+testTableApi(
--- End diff --

@twalthr Unfortunately, add support for POJO and tuples as a standard types 
of data in table API is too complicated task and out of scope this task. Adding 
of new composite types to API more wide than add IN operator, it will impact 
all process of validation and execution of SQL statements. It would be better 
create epic for introduce new types.
I have researched it and I think I would be better commit IN operator 
functionality without support tuples and POJOs. Otherwise, It would become 
infinite task.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-03-09 Thread DmytroShkvyra
Github user DmytroShkvyra commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r105241841
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -150,7 +150,35 @@ class Table(
 * }}}
 */
   def filter(predicate: Expression): Table = {
-new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv))
+
+predicate match {
--- End diff --

@twalthr I have added tests for this case in new PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-03-09 Thread DmytroShkvyra
Github user DmytroShkvyra commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r105238764
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 ---
@@ -870,6 +870,16 @@ class CodeGenerator(
 requireTimeInterval(operand)
 generateUnaryIntervalPlusMinus(plus = true, nullCheck, operand)
 
+  case IN =>
+val left = operands.head
+val right = operands.tail
+val addReusableCodeCallback = (declaration: String, 
initialization: String) => {
--- End diff --

@twalthr `generateIn` can executed long enough. This solution works fine


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-03-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r104393386
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase 
{
   "true")
   }
 
+  @Test
+  def testInExpressions(): Unit = {
+testTableApi(
--- End diff --

Yes, we can also create a follow-up issue for that. But since we are not 
only building a SQL API but also a Table API, POJOs will occur very frequently 
and everywhere. We should support them as good as we can.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-03-06 Thread DmytroShkvyra
Github user DmytroShkvyra commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r104389045
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase 
{
   "true")
   }
 
+  @Test
+  def testInExpressions(): Unit = {
+testTableApi(
--- End diff --

@twalthr Let's create another Jira for support POJOs in IN operator.
To be honest, I think this is not necessary because we can use something 
like this:
`SELECT a, b FROM T WHERE a.prop1.IN[1,2,3] AND a.prop2.IN[4,5,6] AND `
I don't remember any case when in standard SQL in `IN` we can use something 
else than primitives and their wrappers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-03-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r104387054
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase 
{
   "true")
   }
 
+  @Test
+  def testInExpressions(): Unit = {
+testTableApi(
--- End diff --

All types of the right side of the operator have still only one type 
information. We would only support POJOs that implement `Comparable`. We could 
use a set for simple types and an usual array list for POJOs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-03-06 Thread DmytroShkvyra
Github user DmytroShkvyra commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r104379120
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase 
{
   "true")
   }
 
+  @Test
+  def testInExpressions(): Unit = {
+testTableApi(
--- End diff --

@twalthr It's a big problem. Even two problems:
1. We mention that that all types (right) in IN operator have the same type 
because we use Set as collection. And we check if right operand has only one 
`TypeInfo`. Otherwise we should check each item one by one or use something 
like bloom filter
2. Problem with compilation of expression - each field is compiling to more 
simple type (the most big problem)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-03-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r104377221
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -150,7 +150,35 @@ class Table(
 * }}}
 */
   def filter(predicate: Expression): Table = {
-new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv))
+
+predicate match {
--- End diff --

As far as I know `IN` is a general expression, so it can be used where ever 
expressions are allowed. I quickly checked it in MySQL and `SELECT field1 IN ( 
22, 23 ) FROM  Table1` is not a problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-03-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r104375934
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase 
{
   "true")
   }
 
+  @Test
+  def testInExpressions(): Unit = {
+testTableApi(
--- End diff --

You are right, working with POJOs and Case classes does not always give you 
best performance but limiting the number of supported types results in bad user 
experience. Types that are supported in general, should also be supported by 
features. Why can't we use POJOs in SQL? Is `f4 IN (f3, f2, f0)` not possible?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-03-02 Thread DmytroShkvyra
Github user DmytroShkvyra commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r103987812
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase 
{
   "true")
   }
 
+  @Test
+  def testInExpressions(): Unit = {
+testTableApi(
--- End diff --

@twalthr Are you sure? That we need use IN with POJOs/Tuples/Case classes?
First of all I will hit performance because compare of these types too 
complicated. it's easier get subset of id's and use IN than compare POJOs and 
Case classes. More over we cant it use in SQL string statements (we need parse 
POJO and case classes from string)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-03-02 Thread DmytroShkvyra
Github user DmytroShkvyra commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r103976547
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -150,7 +150,35 @@ class Table(
 * }}}
 */
   def filter(predicate: Expression): Table = {
-new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv))
+
+predicate match {
--- End diff --

Did you mean something like that:
`SELECT b.IN[1,2,3] as a FROM T as b` ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-03-02 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r103963098
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
 ---
@@ -79,6 +79,72 @@ object ScalarOperators {
 }
   }
 
+  def generateIn(
+  nullCheck: Boolean,
+  left: GeneratedExpression,
+  right: scala.collection.mutable.Buffer[GeneratedExpression],
+  addReusableCodeCallback: (String, String) => Any)
+: GeneratedExpression = {
+val resultTerm = newName("result")
+val isNull = newName("isNull")
+
+val topNumericalType: Option[TypeInformation[_]] = {
--- End diff --

IMHO, calculating the top numerical type seems not too specific to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-03-02 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r103963525
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
 ---
@@ -1002,12 +1068,17 @@ object ScalarOperators {
 val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
 // no casting necessary
 if (operandType == resultType) {
-  (operandTerm) => s"$operandTerm"
+  if (isDecimal(operandType)) {
+(operandTerm) => s"$operandTerm.stripTrailingZeros()"
--- End diff --

Ok, I'm fine with this change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-03-02 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r103962015
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -150,7 +150,35 @@ class Table(
 * }}}
 */
   def filter(predicate: Expression): Table = {
-new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv))
+
+predicate match {
--- End diff --

The IN operator can also be used in a `select()` statement but if we do the 
translation here, only `filter()` is supported.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-03-01 Thread DmytroShkvyra
Github user DmytroShkvyra commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r103746147
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -150,7 +150,35 @@ class Table(
 * }}}
 */
   def filter(predicate: Expression): Table = {
-new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv))
+
+predicate match {
--- End diff --

What did you mean? Please clarify.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-03-01 Thread DmytroShkvyra
Github user DmytroShkvyra commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r103742289
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
 ---
@@ -79,6 +79,72 @@ object ScalarOperators {
 }
   }
 
+  def generateIn(
+  nullCheck: Boolean,
+  left: GeneratedExpression,
+  right: scala.collection.mutable.Buffer[GeneratedExpression],
+  addReusableCodeCallback: (String, String) => Any)
+: GeneratedExpression = {
+val resultTerm = newName("result")
+val isNull = newName("isNull")
+
+val topNumericalType: Option[TypeInformation[_]] = {
--- End diff --

@twalthr I think it is not necessary because function will has too specific 
input params


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-03-01 Thread DmytroShkvyra
Github user DmytroShkvyra commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r103736499
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
 ---
@@ -1002,12 +1068,17 @@ object ScalarOperators {
 val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
 // no casting necessary
 if (operandType == resultType) {
-  (operandTerm) => s"$operandTerm"
+  if (isDecimal(operandType)) {
+(operandTerm) => s"$operandTerm.stripTrailingZeros()"
+  } else {
+(operandTerm) => s"$operandTerm"
+  }
 }
 // result type is decimal but numeric operand is not
 else if (isDecimal(resultType) && !isDecimal(operandType) && 
isNumeric(operandType)) {
   (operandTerm) =>
-s"java.math.BigDecimal.valueOf((${superPrimitive(operandType)}) 
$operandTerm)"
+s"java.math.BigDecimal.valueOf((${superPrimitive(operandType)}) 
$operandTerm)" +
--- End diff --

@twalthr See my comment above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-03-01 Thread DmytroShkvyra
Github user DmytroShkvyra commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r103734557
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
 ---
@@ -1002,12 +1068,17 @@ object ScalarOperators {
 val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
 // no casting necessary
 if (operandType == resultType) {
-  (operandTerm) => s"$operandTerm"
+  if (isDecimal(operandType)) {
+(operandTerm) => s"$operandTerm.stripTrailingZeros()"
--- End diff --

@twalthr I continue work of @NickolayVasilishin and found In 
5WD-01-Framework-2003-09.pdf (SQL:2003 standard) following text:

> There are two classes of numeric type: exact numeric, which includes 
integer types and types with specified
precision and scale; and approximate numeric, which is essentially floating 
point, and for which a precision
may optionally be specified.
Every number has a precision (number of digits), and exact numeric types 
also have a scale (digits after the
radix point). Arithmetic operations may be performed on operands of 
different or the same numeric type, and
the result is of a numeric type that depends only on the numeric type of 
the operands. If the result cannot be
represented exactly in the result type, then whether it is rounded or 
truncated is implementation-defined. An
exception condition is raised if the result is outside the range of numeric 
values of the result type, or if the
arithmetic operation is not defined for the operands.

 can has different  and different scale, precision 
and rounding modes. 
I think, in case of  we have to strip trailing zeros to avoid 
possible problem with parsing of  value from strings

BTW, standard do not says that 2.00 can be unequal to 2


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-01-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r94806152
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 ---
@@ -870,6 +870,16 @@ class CodeGenerator(
 requireTimeInterval(operand)
 generateUnaryIntervalPlusMinus(plus = true, nullCheck, operand)
 
+  case IN =>
+val left = operands.head
+val right = operands.tail
+val addReusableCodeCallback = (declaration: String, 
initialization: String) => {
--- End diff --

Why do we need this callback here? Couldn't we just call those methods from 
the `generateIn` method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-01-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r94801864
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 ---
@@ -110,6 +111,9 @@ trait ImplicitExpressionOperations {
   def asc = Asc(expr)
   def desc = Desc(expr)
 
+
+  def in(subquery: Expression*) = In(expr, subquery)
+  def in(table: Table) = InSub(expr, table)
--- End diff --

Documentation is missing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-01-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r94802194
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/In.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.typeutils.TypeCheckUtils._
+import org.apache.flink.api.table.validate.{ValidationResult, 
ValidationFailure, ValidationSuccess}
+
+case class In(expression: Expression, subquery: Seq[Expression]) extends 
Expression {
--- End diff --

I think we can put `In` and `InSub` in a common file `in.scala`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-01-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r94801637
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 ---
@@ -110,6 +111,9 @@ trait ImplicitExpressionOperations {
   def asc = Asc(expr)
   def desc = Desc(expr)
 
+
+  def in(subquery: Expression*) = In(expr, subquery)
--- End diff --

Documentation is missing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-01-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r94804831
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/In.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.typeutils.TypeCheckUtils._
+import org.apache.flink.api.table.validate.{ValidationResult, 
ValidationFailure, ValidationSuccess}
+
+case class In(expression: Expression, subquery: Seq[Expression]) extends 
Expression {
+
+  def this(expressions: Seq[Expression]) = this(expressions.head, 
expressions.tail)
+
+  override def toString = s"$expression.in(${subquery.mkString(", ")})"
+
+  /**
+* List of child nodes that should be considered when doing 
transformations. Other values
+* in the Product will not be transformed, only handed through.
+*/
+  override private[flink] def children: Seq[Expression] = expression +: 
subquery.distinct
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+relBuilder.call(SqlStdOperatorTable.IN, children.map(_.toRexNode): _*)
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (children.tail.contains(null)) {
+  ValidationFailure("Operands on right side of IN operator must be not 
null")
+} else {
+  val types = children.tail.map(_.resultType)
+  if (types.distinct.length != 1) {
+ValidationFailure(
+  s"Types on the right side of IN operator must be the same, got 
${types.mkString(", ")}."
+)
+  } else {
+(children.head.resultType, children.tail.head.resultType) match {
+  case (lType, rType) if isNumeric(lType) && isNumeric(rType) => 
ValidationSuccess
+  case (lType, rType) if isComparable(lType) && lType == rType => 
ValidationSuccess
+  case (lType, rType) =>
+ValidationFailure(
+  s"Types on the both side of IN operator must be the same, 
got $lType and $rType"
+)
+}
+  }
+
+}
+  }
+
+  /**
--- End diff --

Same here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-01-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r94923365
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -150,7 +150,35 @@ class Table(
 * }}}
 */
   def filter(predicate: Expression): Table = {
-new Table(tableEnv, Filter(predicate, logicalPlan).validate(tableEnv))
+
+predicate match {
--- End diff --

This is the wrong position for doing the translation as `in` would not work 
in select statements etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-01-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r94804605
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/In.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.typeutils.TypeCheckUtils._
+import org.apache.flink.api.table.validate.{ValidationResult, 
ValidationFailure, ValidationSuccess}
+
+case class In(expression: Expression, subquery: Seq[Expression]) extends 
Expression {
+
+  def this(expressions: Seq[Expression]) = this(expressions.head, 
expressions.tail)
+
+  override def toString = s"$expression.in(${subquery.mkString(", ")})"
+
+  /**
+* List of child nodes that should be considered when doing 
transformations. Other values
+* in the Product will not be transformed, only handed through.
+*/
+  override private[flink] def children: Seq[Expression] = expression +: 
subquery.distinct
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+relBuilder.call(SqlStdOperatorTable.IN, children.map(_.toRexNode): _*)
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (children.tail.contains(null)) {
+  ValidationFailure("Operands on right side of IN operator must be not 
null")
+} else {
+  val types = children.tail.map(_.resultType)
+  if (types.distinct.length != 1) {
+ValidationFailure(
+  s"Types on the right side of IN operator must be the same, got 
${types.mkString(", ")}."
+)
+  } else {
+(children.head.resultType, children.tail.head.resultType) match {
--- End diff --

`children.last` instead of `children.tail.head`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-01-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r94808082
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
 ---
@@ -79,6 +79,72 @@ object ScalarOperators {
 }
   }
 
+  def generateIn(
+  nullCheck: Boolean,
+  left: GeneratedExpression,
--- End diff --

Could you come up with a name for the parameters `left` and `right` (e.g. 
`needle`, `haystack`)? It is difficult to read otherwise; also at other 
locations in the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-01-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r94803910
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/In.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.typeutils.TypeCheckUtils._
+import org.apache.flink.api.table.validate.{ValidationResult, 
ValidationFailure, ValidationSuccess}
+
+case class In(expression: Expression, subquery: Seq[Expression]) extends 
Expression {
+
+  def this(expressions: Seq[Expression]) = this(expressions.head, 
expressions.tail)
+
+  override def toString = s"$expression.in(${subquery.mkString(", ")})"
+
+  /**
+* List of child nodes that should be considered when doing 
transformations. Other values
+* in the Product will not be transformed, only handed through.
+*/
+  override private[flink] def children: Seq[Expression] = expression +: 
subquery.distinct
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+relBuilder.call(SqlStdOperatorTable.IN, children.map(_.toRexNode): _*)
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+if (children.tail.contains(null)) {
+  ValidationFailure("Operands on right side of IN operator must be not 
null")
+} else {
+  val types = children.tail.map(_.resultType)
+  if (types.distinct.length != 1) {
+ValidationFailure(
+  s"Types on the right side of IN operator must be the same, got 
${types.mkString(", ")}."
+)
+  } else {
+(children.head.resultType, children.tail.head.resultType) match {
+  case (lType, rType) if isNumeric(lType) && isNumeric(rType) => 
ValidationSuccess
+  case (lType, rType) if isComparable(lType) && lType == rType => 
ValidationSuccess
+  case (lType, rType) =>
+ValidationFailure(
+  s"Types on the both side of IN operator must be the same, 
got $lType and $rType"
--- End diff --

"Types on both sides of the IN operator must be numeric or of the same 
comparable type"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-01-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r94922812
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/InITCase.scala
 ---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.batch.table
+
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
+import 
org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.{Row, Table, TableEnvironment, 
ValidationException}
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class InITCase(
+mode: TestExecutionMode,
+configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+
+  @Test
+  def testSqlInOperator(): Unit = {
--- End diff --

Can you convert this ITCases into tests extending `TableTestBase`. We try 
to reduce our build time. It is sufficient to test only the optimized plan.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-01-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r94807323
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
 ---
@@ -79,6 +79,72 @@ object ScalarOperators {
 }
   }
 
+  def generateIn(
+  nullCheck: Boolean,
+  left: GeneratedExpression,
+  right: scala.collection.mutable.Buffer[GeneratedExpression],
+  addReusableCodeCallback: (String, String) => Any)
+: GeneratedExpression = {
+val resultTerm = newName("result")
+val isNull = newName("isNull")
+
+val topNumericalType: Option[TypeInformation[_]] = {
--- End diff --

You can extract this logic into a separate method (e.g. in 
`TypeCheckUtils`). Because it might be useful at other locations too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-01-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r94921085
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
 ---
@@ -1002,12 +1068,17 @@ object ScalarOperators {
 val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
 // no casting necessary
 if (operandType == resultType) {
-  (operandTerm) => s"$operandTerm"
+  if (isDecimal(operandType)) {
+(operandTerm) => s"$operandTerm.stripTrailingZeros()"
--- End diff --

We should not modify decimals here. What does the SQL standard says about 
this? Is 2.00 and 2 equal in SQL?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-01-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r94922269
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
 ---
@@ -1101,6 +1101,45 @@ class ScalarFunctionsTest extends ExpressionTestBase 
{
   "true")
   }
 
+  @Test
+  def testInExpressions(): Unit = {
+testTableApi(
--- End diff --

Can you add some more tests? E.g.
- `Null(Types.INT).in(1,2,42, Null(Types.INT))` cannot be compiled.
- Date/Time/Timestamp types which are represented as int/long internally.
- POJOs/Tuples/Case classes
- Fields in the set `f2.in(f3, f4, 4)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-01-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r94922579
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InSub.scala
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.Table
+
+/**
+  * Created by hadoop on 11/17/16.
+  */
+case class InSub(expression: Expression, table: Table) extends Expression {
+override def toString = s"$expression.in($table})"
--- End diff --

Additional `}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-01-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r94804779
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/In.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.typeutils.TypeCheckUtils._
+import org.apache.flink.api.table.validate.{ValidationResult, 
ValidationFailure, ValidationSuccess}
+
+case class In(expression: Expression, subquery: Seq[Expression]) extends 
Expression {
+
+  def this(expressions: Seq[Expression]) = this(expressions.head, 
expressions.tail)
+
+  override def toString = s"$expression.in(${subquery.mkString(", ")})"
+
+  /**
--- End diff --

Can you remove those auto-generated comments?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2017-01-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r94921693
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
 ---
@@ -1002,12 +1068,17 @@ object ScalarOperators {
 val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
 // no casting necessary
 if (operandType == resultType) {
-  (operandTerm) => s"$operandTerm"
+  if (isDecimal(operandType)) {
+(operandTerm) => s"$operandTerm.stripTrailingZeros()"
+  } else {
+(operandTerm) => s"$operandTerm"
+  }
 }
 // result type is decimal but numeric operand is not
 else if (isDecimal(resultType) && !isDecimal(operandType) && 
isNumeric(operandType)) {
   (operandTerm) =>
-s"java.math.BigDecimal.valueOf((${superPrimitive(operandType)}) 
$operandTerm)"
+s"java.math.BigDecimal.valueOf((${superPrimitive(operandType)}) 
$operandTerm)" +
--- End diff --

If I remove this change no test fails. Is it necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2016-12-01 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r90497004
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
 ---
@@ -78,6 +78,69 @@ object ScalarOperators {
 }
   }
 
+  def generateIn(
+  nullCheck: Boolean,
+  left: GeneratedExpression,
+  right: scala.collection.mutable.Buffer[GeneratedExpression],
+  addReusableCodeCallback: (String, String) => Any)
+: GeneratedExpression = {
+val resultTerm = newName("result")
+val isNull = newName("isNull")
+
+val castNumeric = if (isNumeric(left.resultType) || 
isNumeric(right.head.resultType)) {
+  (value: String) => s"""new java.math.BigDecimal("" + 
$value).stripTrailingZeros()"""
+} else {
+  (value: String) => value
+}
+
+val valuesInitialization = if (right.size >= 20) {
--- End diff --

I think we can always use the `HashSet` approach. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2016-12-01 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r90495217
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/In.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.typeutils.TypeCheckUtils._
+import org.apache.flink.api.table.validate.{ValidationResult, 
ValidationFailure, ValidationSuccess}
+
+case class In(expressions: Seq[Expression]) extends Expression {
+  override def toString = 
s"${expressions.head}.in(${expressions.tail.mkString(", ")})"
+
+  /**
+* List of child nodes that should be considered when doing 
transformations. Other values
+* in the Product will not be transformed, only handed through.
+*/
+  override private[flink] def children: Seq[Expression] = expressions
+
+  private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.IN
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+relBuilder.call(SqlStdOperatorTable.IN, children.map(_.toRexNode): _*)
--- End diff --

Can we add a deduplication step for the literal values before translating 
them?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2016-12-01 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r90499247
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/InITCase.java
 ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.batch.table;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.ValidationException;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class InITCase extends TableProgramsTestBase {
--- End diff --

These `ITCases` are very expensive and significantly blow up the build 
time. 
We test expressions like `IN` using the `ExpressionTestBase`. 

Please see `ScalarFunctionsTest` to check how it works and move all tests 
to a test class that extends `ExpressionTestBase`. We would only need to check 
for Scala and Java Table API because `IN`s in SQL queries are differently 
executed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2016-12-01 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2870#discussion_r90496100
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
 ---
@@ -78,6 +78,69 @@ object ScalarOperators {
 }
   }
 
+  def generateIn(
+  nullCheck: Boolean,
+  left: GeneratedExpression,
+  right: scala.collection.mutable.Buffer[GeneratedExpression],
+  addReusableCodeCallback: (String, String) => Any)
+: GeneratedExpression = {
+val resultTerm = newName("result")
+val isNull = newName("isNull")
+
+val castNumeric = if (isNumeric(left.resultType) || 
isNumeric(right.head.resultType)) {
+  (value: String) => s"""new java.math.BigDecimal("" + 
$value).stripTrailingZeros()"""
--- End diff --

Casting all numeric values to `BigInteger` is quite expensive. Please check 
how numeric casting is done for binary operations, e.g., in 
`generateArithmeticOperator()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2870: [FLINK-4565] Support for SQL IN operator

2016-11-25 Thread NickolayVasilishin
GitHub user NickolayVasilishin opened a pull request:

https://github.com/apache/flink/pull/2870

[FLINK-4565] Support for SQL IN operator

[FLINK-4565] Support for SQL IN operator

This PR is a part of work on SQL IN operator in Table API, which 
implements IN for literals.
Two cases are covered: less and great then 20 literals.

Also I have some questions:
- converting all numeric types to BigDecimal isn't ok? I decided to 
make so to simplify use of hashset.
- validation isn't really good. It forces to use operator with same 
type literals. Should I rework it or maybe just add more cases?

expressionDsl.scala:
entry point for IN operator in scala API
ScalarOperators.scala:
1) All numeric types are upcasting to BigDecimal for using in hashset, 
other types are unchanged in castNumeric
2) valuesInitialization used for 2 cases: when we have more then 20 
operands (then we use hashset, initialized in constructor, descibed below) and 
less then 20 operands (then we initialize operands in method's body and use 
them in conjunction)
3) comparison also covers described above cases. In first case we use 
callback to declare and initialize hashset with all operands. Otherwise we just 
put all operands in conjunction.
4) Final code is built up with these code snippets.
CodeGenerator.scala:
passes arguments and callback to declare and init hashet
FunctionCatalog.scala:
registers "in" as method
InITCase:
some use cases


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NickolayVasilishin/flink FLINK-4565

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2870.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2870


commit e495f96c0016f53195398d78f757857eb8546917
Author: nikolay_vasilishin 
Date:   2016-11-25T14:18:20Z

[FLINK-4565] Support for SQL IN operator

expressionDsl.scala:
entry point for IN operator in scala API
ScalarOperators.scala:
1) All numeric types are upcasting to BigDecimal for using in hashset, 
other types are unchanged in castNumeric
2) valuesInitialization used for 2 cases: when we have more then 20 
operands (then we use hashset, initialized in constructor, descibed below) and 
less then 20 operands (then we initialize operands in method's body and use 
them in conjunction)
3) comparison also covers described above cases. In first case we use 
callback to declare and initialize hashset with all operands. Otherwise we just 
put all operands in conjunction.
4) Final code is built up with these code snippets.
CodeGenerator.scala:
passes arguments and callback to declare and init hashet
FunctionCatalog.scala:
registers "in" as method
InITCase:
some use cases




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---