[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...

2018-05-25 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21416#discussion_r190966442
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Column.scala ---
@@ -786,6 +787,24 @@ class Column(val expr: Expression) extends Logging {
   @scala.annotation.varargs
   def isin(list: Any*): Column = withExpr { In(expr, 
list.map(lit(_).expr)) }
 
+  /**
+   * A boolean expression that is evaluated to true if the value of this 
expression is contained
+   * by the provided Set.
+   *
+   * @group expr_ops
+   * @since 2.4.0
+   */
+  def isinSet(values: scala.collection.Set[_]): Column = 
isin(values.toSeq: _*)
--- End diff --

Sound good.

 How do we want to do the naming? `def isin` has `i` as lower case. If we 
do `isInCollection`, it will be slightly inconsistent. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...

2018-05-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21416#discussion_r190507907
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Column.scala ---
@@ -786,6 +787,24 @@ class Column(val expr: Expression) extends Logging {
   @scala.annotation.varargs
   def isin(list: Any*): Column = withExpr { In(expr, 
list.map(lit(_).expr)) }
 
+  /**
+   * A boolean expression that is evaluated to true if the value of this 
expression is contained
+   * by the provided Set.
+   *
+   * @group expr_ops
+   * @since 2.4.0
+   */
+  def isinSet(values: scala.collection.Set[_]): Column = 
isin(values.toSeq: _*)
--- End diff --

the name can be `isInCollection`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...

2018-05-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21416#discussion_r190496061
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -220,6 +219,7 @@ object OptimizeIn extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
 case q: LogicalPlan => q transformExpressionsDown {
   case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral
+  case In(v, list) if list.length == 1 => EqualTo(v, list.head)
--- End diff --

can we update the comment to mention it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...

2018-05-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21416#discussion_r190507754
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Column.scala ---
@@ -786,6 +787,24 @@ class Column(val expr: Expression) extends Logging {
   @scala.annotation.varargs
   def isin(list: Any*): Column = withExpr { In(expr, 
list.map(lit(_).expr)) }
 
+  /**
+   * A boolean expression that is evaluated to true if the value of this 
expression is contained
+   * by the provided Set.
+   *
+   * @group expr_ops
+   * @since 2.4.0
+   */
+  def isinSet(values: scala.collection.Set[_]): Column = 
isin(values.toSeq: _*)
--- End diff --

shall we be more generic here and accept `Iterable`? Then Set, Seq, Array 
can all be accepted.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...

2018-05-25 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21416#discussion_r190799418
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 ---
@@ -149,7 +149,7 @@ object RewritePredicateSubquery extends 
Rule[LogicalPlan] with PredicateHelper {
   newPlan = dedupJoin(
 Join(newPlan, sub, ExistenceJoin(exists), 
conditions.reduceLeftOption(And)))
   exists
-case In(value, Seq(ListQuery(sub, conditions, _, _))) =>
+case EqualTo(value, ListQuery(sub, conditions, _, _)) =>
--- End diff --

@dongjoon-hyun this should fix the test. I'll add one test for this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...

2018-05-24 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/21416#discussion_r190759741
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -220,6 +219,7 @@ object OptimizeIn extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
 case q: LogicalPlan => q transformExpressionsDown {
   case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral
+  case In(v, list) if list.length == 1 => EqualTo(v, list.head)
--- End diff --

Yep. This is that one.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...

2018-05-24 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21416#discussion_r190754229
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -220,6 +219,7 @@ object OptimizeIn extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
 case q: LogicalPlan => q transformExpressionsDown {
   case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral
+  case In(v, list) if list.length == 1 => EqualTo(v, list.head)
--- End diff --

In fact, I'm debugging this StackOverflowError issue shown in Hive test. 
Thanks for this which helps me a lot for local testing. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...

2018-05-24 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/21416#discussion_r190722260
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -220,6 +219,7 @@ object OptimizeIn extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
 case q: LogicalPlan => q transformExpressionsDown {
   case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral
+  case In(v, list) if list.length == 1 => EqualTo(v, list.head)
--- End diff --

Could you add the following test case, too?
```scala
scala> sql("select * from t group by a having count(*) = (select count(*) 
from t)").explain
== Physical Plan ==
*(2) Project [a#2L]
+- *(2) Filter (count(1)#75L = Subquery subquery62)
   :  +- Subquery subquery62
   : +- *(2) HashAggregate(keys=[], functions=[count(1)])
   :+- Exchange SinglePartition
   :   +- *(1) HashAggregate(keys=[], functions=[partial_count(1)])
   :  +- *(1) Project
   : +- *(1) Range (0, 1, step=1, splits=8)
   +- *(2) HashAggregate(keys=[a#2L], functions=[count(1)])
  +- Exchange hashpartitioning(a#2L, 200)
 +- *(1) HashAggregate(keys=[a#2L], functions=[partial_count(1)])
+- *(1) Project [id#0L AS a#2L]
   +- *(1) Range (0, 1, step=1, splits=8)

scala> sql("select * from t group by a having count(*) in (select count(*) 
from t)").explain
java.lang.StackOverflowError
  at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...

2018-05-24 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/21416#discussion_r190717637
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -219,7 +219,11 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
 object OptimizeIn extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
 case q: LogicalPlan => q transformExpressionsDown {
-  case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral
+  case In(v, list) if list.isEmpty =>
+// When v is not nullable, the following expression will be 
optimized
+// to FalseLiteral which is tested in OptimizeInSuite.scala
+If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType))
+  case In(v, list) if list.length == 1 => EqualTo(v, list.head)
--- End diff --

I got it. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...

2018-05-23 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21416#discussion_r190472138
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -219,7 +219,11 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
 object OptimizeIn extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
 case q: LogicalPlan => q transformExpressionsDown {
-  case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral
+  case In(v, list) if list.isEmpty =>
+// When v is not nullable, the following expression will be 
optimized
+// to FalseLiteral which is tested in OptimizeInSuite.scala
+If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType))
+  case In(v, list) if list.length == 1 => EqualTo(v, list.head)
--- End diff --

Why does it have any implication on typecasting? With this PR, it seems I 
get the correct result.

```scala
== Analyzed Logical Plan ==
(CAST(1.1 AS STRING) IN (CAST(1 AS STRING))): boolean, (CAST(1.1 AS INT) = 
1): boolean
Project [cast(1.1 as string) IN (cast(1 as string)) AS (CAST(1.1 AS STRING) 
IN (CAST(1 AS STRING)))#484, (cast(1.1 as int) = 1) AS (CAST(1.1 AS INT) = 
1)#485]
+- OneRowRelation

== Optimized Logical Plan ==
Project [false AS (CAST(1.1 AS STRING) IN (CAST(1 AS STRING)))#484, true AS 
(CAST(1.1 AS INT) = 1)#485]
+- OneRowRelation

== Physical Plan ==
*(1) Project [false AS (CAST(1.1 AS STRING) IN (CAST(1 AS STRING)))#484, 
true AS (CAST(1.1 AS INT) = 1)#485]
+- Scan OneRowRelation[]
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...

2018-05-23 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21416#discussion_r190470525
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala ---
@@ -397,6 +399,68 @@ class ColumnExpressionSuite extends QueryTest with 
SharedSQLContext {
 }
   }
 
+  test("isinSet: Scala Set") {
+val df = Seq((1, "x"), (2, "y"), (3, "z")).toDF("a", "b")
+checkAnswer(df.filter($"a".isinSet(Set(1, 2))),
+  df.collect().toSeq.filter(r => r.getInt(0) == 1 || r.getInt(0) == 2))
+checkAnswer(df.filter($"a".isinSet(Set(3, 2))),
+  df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 2))
+checkAnswer(df.filter($"a".isinSet(Set(3, 1))),
+  df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 1))
+
+// Auto casting should work with mixture of different types in Set
+checkAnswer(df.filter($"a".isinSet(Set(1.toShort, "2"))),
+  df.collect().toSeq.filter(r => r.getInt(0) == 1 || r.getInt(0) == 2))
+checkAnswer(df.filter($"a".isinSet(Set("3", 2.toLong))),
+  df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 2))
+checkAnswer(df.filter($"a".isinSet(Set(3, "1"))),
+  df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 1))
+
+checkAnswer(df.filter($"b".isinSet(Set("y", "x"))),
+  df.collect().toSeq.filter(r => r.getString(1) == "y" || 
r.getString(1) == "x"))
+checkAnswer(df.filter($"b".isinSet(Set("z", "x"))),
+  df.collect().toSeq.filter(r => r.getString(1) == "z" || 
r.getString(1) == "x"))
+checkAnswer(df.filter($"b".isinSet(Set("z", "y"))),
+  df.collect().toSeq.filter(r => r.getString(1) == "z" || 
r.getString(1) == "y"))
+
+val df2 = Seq((1, Seq(1)), (2, Seq(2)), (3, Seq(3))).toDF("a", "b")
+
+intercept[AnalysisException] {
+  df2.filter($"a".isinSet(Set($"b")))
+}
--- End diff --

Addressed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...

2018-05-23 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/21416#discussion_r190407851
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -219,7 +219,11 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
 object OptimizeIn extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
 case q: LogicalPlan => q transformExpressionsDown {
-  case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral
+  case In(v, list) if list.isEmpty =>
+// When v is not nullable, the following expression will be 
optimized
+// to FalseLiteral which is tested in OptimizeInSuite.scala
+If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType))
+  case In(v, list) if list.length == 1 => EqualTo(v, list.head)
--- End diff --

Ur, @dbtsai . This will cause side-effects on typecasting. For example, 
please see the following example. Could you add these kind of test cases?
```scala
scala> sql("select '1.1' in (1), '1.1' = 1").collect()
res0: Array[org.apache.spark.sql.Row] = Array([false,true])
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...

2018-05-23 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/21416#discussion_r190399106
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala ---
@@ -397,6 +399,68 @@ class ColumnExpressionSuite extends QueryTest with 
SharedSQLContext {
 }
   }
 
+  test("isinSet: Scala Set") {
+val df = Seq((1, "x"), (2, "y"), (3, "z")).toDF("a", "b")
+checkAnswer(df.filter($"a".isinSet(Set(1, 2))),
+  df.collect().toSeq.filter(r => r.getInt(0) == 1 || r.getInt(0) == 2))
+checkAnswer(df.filter($"a".isinSet(Set(3, 2))),
+  df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 2))
+checkAnswer(df.filter($"a".isinSet(Set(3, 1))),
+  df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 1))
+
+// Auto casting should work with mixture of different types in Set
+checkAnswer(df.filter($"a".isinSet(Set(1.toShort, "2"))),
+  df.collect().toSeq.filter(r => r.getInt(0) == 1 || r.getInt(0) == 2))
+checkAnswer(df.filter($"a".isinSet(Set("3", 2.toLong))),
+  df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 2))
+checkAnswer(df.filter($"a".isinSet(Set(3, "1"))),
+  df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 1))
+
+checkAnswer(df.filter($"b".isinSet(Set("y", "x"))),
+  df.collect().toSeq.filter(r => r.getString(1) == "y" || 
r.getString(1) == "x"))
+checkAnswer(df.filter($"b".isinSet(Set("z", "x"))),
+  df.collect().toSeq.filter(r => r.getString(1) == "z" || 
r.getString(1) == "x"))
+checkAnswer(df.filter($"b".isinSet(Set("z", "y"))),
+  df.collect().toSeq.filter(r => r.getString(1) == "z" || 
r.getString(1) == "y"))
+
+val df2 = Seq((1, Seq(1)), (2, Seq(2)), (3, Seq(3))).toDF("a", "b")
+
+intercept[AnalysisException] {
+  df2.filter($"a".isinSet(Set($"b")))
+}
--- End diff --

Let's check the error message to prevent the future regression like raising 
different AnalysisException.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...

2018-05-23 Thread dbtsai
GitHub user dbtsai opened a pull request:

https://github.com/apache/spark/pull/21416

[SPARK-24371] [SQL] Added isinSet in DataFrame API for Scala and Java.

## What changes were proposed in this pull request?

Implemented **`isinSet`** in DataFrame API for both Scala and Java, so 
users can do

```scala
val profileDF = Seq(
  Some(1), Some(2), Some(3), Some(4),
  Some(5), Some(6), Some(7), None
).toDF("profileID")

val validUsers: Set[Any] = Set(6, 7.toShort, 8L, "3")

val result = profileDF.withColumn("isValid", 
$"profileID".isinSet(validUsers))

result.show(10)
"""
+-+---+
|profileID|isValid|
+-+---+
|1|  false|
|2|  false|
|3|   true|
|4|  false|
|5|  false|
|6|   true|
|7|   true|
| null|   null|
+-+---+
 """.stripMargin
```

Two new rules in the logical plan optimizers are added.
 
1. When there is only one element in the **`Set`**, the physical plan will 
be optimized to **`EqualTo`**, so predicate pushdown can be used. 

```scala
profileDF.filter( $"profileID".isinSet(Set(6))).explain(true)
"""
  |== Physical Plan ==
  |*(1) Project [profileID#0]
  |+- *(1) Filter (isnotnull(profileID#0) && (profileID#0 = 6))
  |   +- *(1) FileScan parquet [profileID#0] Batched: true, Format: 
Parquet, 
  | PartitionFilters: [], 
  | PushedFilters: [IsNotNull(profileID), EqualTo(profileID,6)],
  | ReadSchema: struct
""".stripMargin
```

2. When the **`Set`** is empty, and the input is nullable, the logical plan 
will be simplified to 

```scala
profileDF.filter( $"profileID".isinSet(Set())).explain(true)
"""
  |== Optimized Logical Plan ==
  |Filter if (isnull(profileID#0)) null else false
  |+- Relation[profileID#0] parquet
""".stripMargin
```

TODO:

1. For multiple conditions with numbers less than certain thresholds, we 
should still allow predicate pushdown.
2. Optimize the **`In`** using **`tableswitch`** or **`lookupswitch`** when 
the numbers of the categories are low, and they are **`Int`**, **`Long`**.
3. The default immutable hash trees set is slow for query, and we should do 
benchmark for using different set implementation for faster query.
4. **`filter(if (condition) null else false)`** can be optimized to false.

## How was this patch tested?

Several unit tests are added.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dbtsai/spark optimize-set

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21416.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21416


commit ec91220d5e8658500be6d049f8ab3496fc8a914e
Author: DB Tsai 
Date:   2018-05-17T00:21:14Z

Added isinSet in DataFrame API for Scala and Java.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org