Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19498#discussion_r147551966 --- Diff: python/pyspark/streaming/util.py --- @@ -64,7 +64,11 @@ def call(self, milliseconds, jrdds): t = datetime.fromtimestamp(milliseconds / 1000.0) r = self.func(t, *rdds) if r: - return r._jrdd + # Here, we work around to ensure `_jrdd` is `JavaRDD` by wrapping it by `map`. + # org.apache.spark.streaming.api.python.PythonTransformFunction requires to return + # `JavaRDD`; however, this could be `JavaPairRDD` by some APIs, for example, `zip`. + # See SPARK-17756. + return r.map(lambda x: x)._jrdd --- End diff -- @tdas, I believe this one is something we should fix ... would you like me to keep this way, try the way above or find another way to avoid a map even if it is complex? Otherwise, I can run a benchmark on RDD with extra map too see the slight perf diff (although I am not sure yet if I can show the meaningful diff).
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org