GitHub user HyukjinKwon opened a pull request:
https://github.com/apache/spark/pull/19498
[SPARK-17756][PYTHON][STREAMING] Workaround to avoid return type mismatch
in PythonTransformFunction
## What changes were proposed in this pull request?
This PR proposes to wrap the transformed rdd within `TransformFunction`.
`PythonTransformFunction` looks requiring to return `JavaRDD` in `_jrdd`.
https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/python/pyspark/streaming/util.py#L67
https://github.com/apache/spark/blob/6ee28423ad1b2e6089b82af64a31d77d3552bb38/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala#L43
However, this could be `JavaPairRDD` by some APIs, for example, `zip` in
PySpark's RDD API.
`_jrdd` could be checked as below:
```python
>>> rdd.zip(rdd)._jrdd.getClass().toString()
u'class org.apache.spark.api.java.JavaPairRDD'
```
So, here, I wrapped it with `map` so that it ensures returning `JavaRDD`.
```python
>>> rdd.zip(rdd).map(lambda x: x)._jrdd.getClass().toString()
u'class org.apache.spark.api.java.JavaRDD'
```
I tried to elaborate some failure cases as below:
```python
from pyspark.streaming import StreamingContext
ssc = StreamingContext(spark.sparkContext, 10)
ssc.queueStream([sc.range(10)]) \
.transform(lambda rdd: rdd.cartesian(rdd)) \
.pprint()
ssc.start()
```
```python
from pyspark.streaming import StreamingContext
ssc = StreamingContext(spark.sparkContext, 10)
ssc.queueStream([sc.range(10)]).foreachRDD(lambda rdd: rdd.cartesian(rdd))
ssc.start()
```
```python
from pyspark.streaming import StreamingContext
ssc = StreamingContext(spark.sparkContext, 10)
ssc.queueStream([sc.range(10)]).foreachRDD(lambda rdd: rdd.zip(rdd))
ssc.start()
```
```python
from pyspark.streaming import StreamingContext
ssc = StreamingContext(spark.sparkContext, 10)
ssc.queueStream([sc.range(10)]).foreachRDD(lambda rdd:
rdd.zip(rdd).union(rdd.zip(rdd)))
ssc.start()
```
```python
from pyspark.streaming import StreamingContext
ssc = StreamingContext(spark.sparkContext, 10)
ssc.queueStream([sc.range(10)]).foreachRDD(lambda rdd:
rdd.zip(rdd).coalesce(1))
ssc.start()
```
## How was this patch tested?
Unit tests were added in `python/pyspark/streaming/tests.py` and manually
tested.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/HyukjinKwon/spark SPARK-17756
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/19498.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #19498
----
commit f5a2a884d860e9c8b3f98fc4ae5f10eaf3c1a0a4
Author: hyukjinkwon <[email protected]>
Date: 2017-10-14T13:50:49Z
Workaround to avoid return type mispatch in PythonTransformFunction
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]