GitHub user dbtsai opened a pull request:

    https://github.com/apache/spark/pull/21416

    [SPARK-24371] [SQL] Added isinSet in DataFrame API for Scala and Java.

    ## What changes were proposed in this pull request?
    
    Implemented **`isinSet`** in DataFrame API for both Scala and Java, so 
users can do
    
    ```scala
    val profileDF = Seq(
      Some(1), Some(2), Some(3), Some(4),
      Some(5), Some(6), Some(7), None
    ).toDF("profileID")
    
    val validUsers: Set[Any] = Set(6, 7.toShort, 8L, "3")
    
    val result = profileDF.withColumn("isValid", 
$"profileID".isinSet(validUsers))
    
    result.show(10)
    """
    +---------+-------+
    |profileID|isValid|
    +---------+-------+
    |        1|  false|
    |        2|  false|
    |        3|   true|
    |        4|  false|
    |        5|  false|
    |        6|   true|
    |        7|   true|
    |     null|   null|
    +---------+-------+
     """.stripMargin
    ```
    
    Two new rules in the logical plan optimizers are added.
     
    1. When there is only one element in the **`Set`**, the physical plan will 
be optimized to **`EqualTo`**, so predicate pushdown can be used. 
    
    ```scala
        profileDF.filter( $"profileID".isinSet(Set(6))).explain(true)
        """
          |== Physical Plan ==
          |*(1) Project [profileID#0]
          |+- *(1) Filter (isnotnull(profileID#0) && (profileID#0 = 6))
          |   +- *(1) FileScan parquet [profileID#0] Batched: true, Format: 
Parquet, 
          |     PartitionFilters: [], 
          |     PushedFilters: [IsNotNull(profileID), EqualTo(profileID,6)],
          |     ReadSchema: struct<profileID:int>
        """.stripMargin
    ```
    
    2. When the **`Set`** is empty, and the input is nullable, the logical plan 
will be simplified to 
    
    ```scala
        profileDF.filter( $"profileID".isinSet(Set())).explain(true)
        """
          |== Optimized Logical Plan ==
          |Filter if (isnull(profileID#0)) null else false
          |+- Relation[profileID#0] parquet
        """.stripMargin
    ```
    
    TODO:
    
    1. For multiple conditions with numbers less than certain thresholds, we 
should still allow predicate pushdown.
    2. Optimize the **`In`** using **`tableswitch`** or **`lookupswitch`** when 
the numbers of the categories are low, and they are **`Int`**, **`Long`**.
    3. The default immutable hash trees set is slow for query, and we should do 
benchmark for using different set implementation for faster query.
    4. **`filter(if (condition) null else false)`** can be optimized to false.
    
    ## How was this patch tested?
    
    Several unit tests are added.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dbtsai/spark optimize-set

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21416.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21416
    
----
commit ec91220d5e8658500be6d049f8ab3496fc8a914e
Author: DB Tsai <d_tsai@...>
Date:   2018-05-17T00:21:14Z

    Added isinSet in DataFrame API for Scala and Java.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to