GitHub user HyukjinKwon opened a pull request:
https://github.com/apache/spark/pull/21007
[SPARK-23942][PYTHON][SQL] Makes collect in PySpark as action for a query
executor listener
## What changes were proposed in this pull request?
This PR proposes to add `collect` to a query executor as an action.
Seems `collect` / `collect` with Arrow are not recognised via
`QueryExecutionListener` as an action. For example, if we have a custom
listener as below:
```scala
package org.apache.spark.sql
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.QueryExecutionListener
class TestQueryExecutionListener extends QueryExecutionListener with
Logging {
override def onSuccess(funcName: String, qe: QueryExecution, durationNs:
Long): Unit = {
logError("Look at me! I'm 'onSuccess'")
}
override def onFailure(funcName: String, qe: QueryExecution, exception:
Exception): Unit = { }
}
```
**Before**
```python
>>> sql("SELECT * FROM range(1)").collect()
```
```
[Row(id=0)]
```
```python
>>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
>>> sql("SELECT * FROM range(1)").toPandas()
```
```
id
0 0
```
**After**
```python
>>> sql("SELECT * FROM range(1)").collect()
```
```
18/04/09 16:57:58 ERROR TestQueryExecutionListener: Look at me! I'm
'onSuccess'
[Row(id=0)]
```
```python
>>> spark.conf.set("spark.sql.execution.arrow.enabled", "true")
>>> sql("SELECT * FROM range(1)").toPandas()
```
```
18/04/09 17:53:26 ERROR TestQueryExecutionListener: Look at me! I'm
'onSuccess'
id
0 0
```
Other operations in PySpark or Scala side seems fine:
```python
>>> sql("SELECT * FROM range(1)").show()
```
```
18/04/09 17:02:04 ERROR TestQueryExecutionListener: Look at me! I'm
'onSuccess'
+---+
| id|
+---+
| 0|
+---+
```
```scala
scala> sql("SELECT * FROM range(1)").collect()
```
```
18/04/09 16:58:41 ERROR TestQueryExecutionListener: Look at me! I'm
'onSuccess'
res1: Array[org.apache.spark.sql.Row] = Array([0])
```
## How was this patch tested?
I have manually tested as described above.
It's possible to add a test but I should make a mock
`QueryExecutionListener`, static object with a variable updated by the mock
`QueryExecutionListener` and check the variable via Py4J. This will also need
manual skip condition in PySpark side.
I can add this test but .. I usually try to avoid a test with JVM access ..
let me know if anyone feels ^ is required.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/HyukjinKwon/spark SPARK-23942
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/21007.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #21007
----
commit db1987f63370c6c2f9434aea76da7d326565be5a
Author: hyukjinkwon <gurwls223@...>
Date: 2018-04-09T09:54:44Z
Makes collect in PySpark as action for a query executor listener
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]