Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/19498#discussion_r146531880
--- Diff: python/pyspark/streaming/util.py ---
@@ -64,7 +64,11 @@ def call(self, milliseconds, jrdds):
t = datetime.fromtimestamp(milliseconds / 1000.0)
r = self.func(t, *rdds)
if r:
- return r._jrdd
+ # Here, we work around to ensure `_jrdd` is `JavaRDD` by
wrapping it by `map`.
+ #
org.apache.spark.streaming.api.python.PythonTransformFunction requires to return
+ # `JavaRDD`; however, this could be `JavaPairRDD` by some
APIs, for example, `zip`.
+ # See SPARK-17756.
+ return r.map(lambda x: x)._jrdd
--- End diff --
Yup, it guess it would but I could not think of a better and simpler way
(and I saw this workaround is already used above in
`python/pyspark/streaming/context.py`). If a little bit more uglier (possibly
flaky imho) way is okay for this concern, I could do this, for example, as
below:
```python
if r._jrdd.getClass().toString() == "JavaRDD":
return r._jrdd
else:
return r.map(lambda x: x)._jrdd
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]