Github user MLnick commented on the pull request:

    https://github.com/apache/spark/pull/1338#issuecomment-50068945
  
    Ok I've been through it in detail and everything looks good, with the 
exception of one issue that I picked up. The saving currently inherently 
assumes that the `serializer` on the `PythonRDD` is `PickleSerializer`. If this 
is not the case for whatever reason then it will fail.
    
    This happens for `RDD`s created from `cartesian` and `zip`:
    
    ```python
    In [1]: rdd = sc.parallelize(range(10))
    In [2]: cart = rdd.cartesian(rdd)
    In [3]: cart.saveAsSequenceFile("/tmp/cart.sf")
    ---------------------------------------------------------------------------
    Py4JError                                 Traceback (most recent call last)
    <ipython-input-3-06070310df19> in <module>()
    ----> 1 cart.saveAsSequenceFile("/tmp/cart.sf")
    
    ...
    
    Py4JError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.saveAsSequenceFile. Trace:
    py4j.Py4JException: Method saveAsSequenceFile([class 
org.apache.spark.api.java.JavaPairRDD, class java.lang.String, null]) does not 
exist
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:358)
        at py4j.Gateway.invoke(Gateway.java:254)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)
    
    In [4]: x = sc.parallelize(range(0,5))
    
    In [5]: y = sc.parallelize(range(1000, 1005))
    
    In [6]: x.zip(y).saveAsSequenceFile("/tmp/zip.sf")
    ---------------------------------------------------------------------------
    Py4JError                                 Traceback (most recent call last)
    <ipython-input-6-ad82cb2565a3> in <module>()
    ----> 1 x.zip(y).saveAsSequenceFile("/tmp/zip.sf")
    
    ...
    
    
/Users/Nick/workspace/scala/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py
 in get_return_value(answer, gateway_client, target_id, name)
        302                 raise Py4JError(
        303                     'An error occurred while calling {0}{1}{2}. 
Trace:\n{3}\n'.
    --> 304                     format(target_id, '.', name, value))
        305         else:
        306             raise Py4JError(
    
    Py4JError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.saveAsSequenceFile. Trace:
    py4j.Py4JException: Method saveAsSequenceFile([class 
org.apache.spark.api.java.JavaPairRDD, class java.lang.String, null]) does not 
exist
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:358)
        at py4j.Gateway.invoke(Gateway.java:254)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)
    ```
    
    The fix is simple - use what `saveAsPickleFile` does:
    ```python
    self._reserialize(BatchedSerializer(PickleSerializer(), batchSize))._jrdd
    ```
    
    I confirmed that something like the following works in `saveAsSequenceFile`:
    ```python
    reser_rdd = self._reserialize(BatchedSerializer(PickleSerializer(), 
batchSize))._jrdd
    self.ctx._jvm.PythonRDD.saveAsSequenceFile(reser_rdd, path, 
compressionCodecClass)
    ```
    
    Apart from this I think this looks good for merge, subject to @mateiz call 
on the covariance point.
    
    @kanzhang would be great to try get this finalised for merge window 
tomorrow. Sorry for the delay on my side!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to