GitHub user dbtsai opened a pull request:
https://github.com/apache/spark/pull/21416
[SPARK-24371] [SQL] Added isinSet in DataFrame API for Scala and Java.
## What changes were proposed in this pull request?
Implemented **`isinSet`** in DataFrame API for both Scala and Java, so
users can do
```scala
val profileDF = Seq(
Some(1), Some(2), Some(3), Some(4),
Some(5), Some(6), Some(7), None
).toDF("profileID")
val validUsers: Set[Any] = Set(6, 7.toShort, 8L, "3")
val result = profileDF.withColumn("isValid",
$"profileID".isinSet(validUsers))
result.show(10)
"""
+---------+-------+
|profileID|isValid|
+---------+-------+
| 1| false|
| 2| false|
| 3| true|
| 4| false|
| 5| false|
| 6| true|
| 7| true|
| null| null|
+---------+-------+
""".stripMargin
```
Two new rules in the logical plan optimizers are added.
1. When there is only one element in the **`Set`**, the physical plan will
be optimized to **`EqualTo`**, so predicate pushdown can be used.
```scala
profileDF.filter( $"profileID".isinSet(Set(6))).explain(true)
"""
|== Physical Plan ==
|*(1) Project [profileID#0]
|+- *(1) Filter (isnotnull(profileID#0) && (profileID#0 = 6))
| +- *(1) FileScan parquet [profileID#0] Batched: true, Format:
Parquet,
| PartitionFilters: [],
| PushedFilters: [IsNotNull(profileID), EqualTo(profileID,6)],
| ReadSchema: struct<profileID:int>
""".stripMargin
```
2. When the **`Set`** is empty, and the input is nullable, the logical plan
will be simplified to
```scala
profileDF.filter( $"profileID".isinSet(Set())).explain(true)
"""
|== Optimized Logical Plan ==
|Filter if (isnull(profileID#0)) null else false
|+- Relation[profileID#0] parquet
""".stripMargin
```
TODO:
1. For multiple conditions with numbers less than certain thresholds, we
should still allow predicate pushdown.
2. Optimize the **`In`** using **`tableswitch`** or **`lookupswitch`** when
the numbers of the categories are low, and they are **`Int`**, **`Long`**.
3. The default immutable hash trees set is slow for query, and we should do
benchmark for using different set implementation for faster query.
4. **`filter(if (condition) null else false)`** can be optimized to false.
## How was this patch tested?
Several unit tests are added.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/dbtsai/spark optimize-set
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/21416.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #21416
----
commit ec91220d5e8658500be6d049f8ab3496fc8a914e
Author: DB Tsai <d_tsai@...>
Date: 2018-05-17T00:21:14Z
Added isinSet in DataFrame API for Scala and Java.
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]