Github user MLnick commented on the pull request:
https://github.com/apache/spark/pull/1338#issuecomment-50068945
Ok I've been through it in detail and everything looks good, with the
exception of one issue that I picked up. The saving currently inherently
assumes that the `serializer` on the `PythonRDD` is `PickleSerializer`. If this
is not the case for whatever reason then it will fail.
This happens for `RDD`s created from `cartesian` and `zip`:
```python
In [1]: rdd = sc.parallelize(range(10))
In [2]: cart = rdd.cartesian(rdd)
In [3]: cart.saveAsSequenceFile("/tmp/cart.sf")
---------------------------------------------------------------------------
Py4JError Traceback (most recent call last)
<ipython-input-3-06070310df19> in <module>()
----> 1 cart.saveAsSequenceFile("/tmp/cart.sf")
...
Py4JError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.saveAsSequenceFile. Trace:
py4j.Py4JException: Method saveAsSequenceFile([class
org.apache.spark.api.java.JavaPairRDD, class java.lang.String, null]) does not
exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:358)
at py4j.Gateway.invoke(Gateway.java:254)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
In [4]: x = sc.parallelize(range(0,5))
In [5]: y = sc.parallelize(range(1000, 1005))
In [6]: x.zip(y).saveAsSequenceFile("/tmp/zip.sf")
---------------------------------------------------------------------------
Py4JError Traceback (most recent call last)
<ipython-input-6-ad82cb2565a3> in <module>()
----> 1 x.zip(y).saveAsSequenceFile("/tmp/zip.sf")
...
/Users/Nick/workspace/scala/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
302 raise Py4JError(
303 'An error occurred while calling {0}{1}{2}.
Trace:\n{3}\n'.
--> 304 format(target_id, '.', name, value))
305 else:
306 raise Py4JError(
Py4JError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.saveAsSequenceFile. Trace:
py4j.Py4JException: Method saveAsSequenceFile([class
org.apache.spark.api.java.JavaPairRDD, class java.lang.String, null]) does not
exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:358)
at py4j.Gateway.invoke(Gateway.java:254)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
```
The fix is simple - use what `saveAsPickleFile` does:
```python
self._reserialize(BatchedSerializer(PickleSerializer(), batchSize))._jrdd
```
I confirmed that something like the following works in `saveAsSequenceFile`:
```python
reser_rdd = self._reserialize(BatchedSerializer(PickleSerializer(),
batchSize))._jrdd
self.ctx._jvm.PythonRDD.saveAsSequenceFile(reser_rdd, path,
compressionCodecClass)
```
Apart from this I think this looks good for merge, subject to @mateiz call
on the covariance point.
@kanzhang would be great to try get this finalised for merge window
tomorrow. Sorry for the delay on my side!
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---