GitHub user HyukjinKwon opened a pull request:

    https://github.com/apache/spark/pull/21007

    [SPARK-23942][PYTHON][SQL] Makes collect in PySpark as action for a query 
executor listener

    ## What changes were proposed in this pull request?
    
    This PR proposes to add `collect` to  a query executor as an action.
    
    Seems `collect` / `collect` with Arrow are not recognised via 
`QueryExecutionListener` as an action. For example, if we have a custom 
listener as below:
    
    ```scala
    package org.apache.spark.sql
    
    import org.apache.spark.internal.Logging
    import org.apache.spark.sql.execution.QueryExecution
    import org.apache.spark.sql.util.QueryExecutionListener
    
    
    class TestQueryExecutionListener extends QueryExecutionListener with 
Logging {
      override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
        logError("Look at me! I'm 'onSuccess'")
      }
    
      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = { }
    }
    ```
    
    **Before**
    
    ```python
    >>> sql("SELECT * FROM range(1)").collect()
    ```
    ```
    [Row(id=0)]
    ```
    
    ```python
    >>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
    >>> sql("SELECT * FROM range(1)").toPandas()
    ```
    ```
       id
    0   0
    ```
    
    **After**
    
    ```python
    >>> sql("SELECT * FROM range(1)").collect()
    ```
    ```
    18/04/09 16:57:58 ERROR TestQueryExecutionListener: Look at me! I'm 
'onSuccess'
    [Row(id=0)]
    ```
    
    ```python
    >>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
    >>> sql("SELECT * FROM range(1)").toPandas()
    ```
    ```
    18/04/09 17:53:26 ERROR TestQueryExecutionListener: Look at me! I'm 
'onSuccess'
       id
    0   0
    ```
    
    
    Other operations in PySpark or Scala side seems fine:
    
    ```python
    >>> sql("SELECT * FROM range(1)").show()
    ```
    ```
    18/04/09 17:02:04 ERROR TestQueryExecutionListener: Look at me! I'm 
'onSuccess'
    +---+
    | id|
    +---+
    |  0|
    +---+
    ```
    
    ```scala
    scala> sql("SELECT * FROM range(1)").collect()
    ```
    ```
    18/04/09 16:58:41 ERROR TestQueryExecutionListener: Look at me! I'm 
'onSuccess'
    res1: Array[org.apache.spark.sql.Row] = Array([0])
    ```
    
    ## How was this patch tested?
    
    I have manually tested as described above.
    
    It's possible to add a test but I should make a mock 
`QueryExecutionListener`, static object with a variable updated by the mock 
`QueryExecutionListener` and check the variable via Py4J. This will also need 
manual skip condition in PySpark side.
    
    I can add this test but .. I usually try to avoid a test with JVM access .. 
let me know if anyone feels ^ is required.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HyukjinKwon/spark SPARK-23942

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21007.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21007
    
----
commit db1987f63370c6c2f9434aea76da7d326565be5a
Author: hyukjinkwon <gurwls223@...>
Date:   2018-04-09T09:54:44Z

    Makes collect in PySpark as action for a query executor listener

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to