[ https://issues.apache.org/jira/browse/SPARK-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Josh Rosen resolved SPARK-1011. ------------------------------- Resolution: Fixed Fix Version/s: 0.9.1 Assignee: Hossein Falaki This was fixed in Spark 0.9.1 with the addition of a new {{predictAll}} method for performing bulk predictions. This was added in commit https://github.com/apache/spark/commit/b2e690f839e7ee47f405135d35170173386c5d13. > MatrixFactorizationModel in pyspark throws serialization error > -------------------------------------------------------------- > > Key: SPARK-1011 > URL: https://issues.apache.org/jira/browse/SPARK-1011 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 0.9.0 > Reporter: Hossein Falaki > Assignee: Hossein Falaki > Fix For: 0.9.1 > > > When running the following sample code in pyspark, > {code} > from pyspark.mllib.recommendation import ALS > from numpy import array > # Load and parse the data > data = sc.textFile("mllib/data/als/test.data") > ratings = data.map(lambda line: array([float(x) for x in line.split(',')])) > # Build the recommendation model using Alternating Least Squares > model = ALS.train(sc, ratings, 1, 20) > # Evaluate the model on training data > ratesAndPreds = ratings.map(lambda p: (p[2], model.predict(int(p[0]), > int(p[1])))) > ratesAndPreds.take(1) > {code} > I get: > {code} > >>> ratesAndPreds.take(1) > Traceback (most recent call last): > File "<stdin>", line 1, in <module> > File "/Users/hossein/Projects/incubator-spark/python/pyspark/rdd.py", line > 585, in take > for partition in range(mapped._jrdd.splits().size()): > File "/Users/hossein/Projects/incubator-spark/python/pyspark/rdd.py", line > 984, in _jrdd > pickled_command = CloudPickleSerializer().dumps(command) > File > "/Users/hossein/Projects/incubator-spark/python/pyspark/serializers.py", line > 248, in dumps > def dumps(self, obj): return cloudpickle.dumps(obj, 2) > File > "/Users/hossein/Projects/incubator-spark/python/pyspark/cloudpickle.py", line > 801, in dumps > cp.dump(obj) > File > "/Users/hossein/Projects/incubator-spark/python/pyspark/cloudpickle.py", line > 140, in dump > return pickle.Pickler.dump(self, obj) > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 224, in dump > self.save(obj) > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 286, in save > f(self, obj) # Call unbound method with explicit self > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 548, in save_tuple > save(element) > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 286, in save > f(self, obj) # Call unbound method with explicit self > File > "/Users/hossein/Projects/incubator-spark/python/pyspark/cloudpickle.py", line > 259, in save_function > self.save_function_tuple(obj, [themodule]) > File > "/Users/hossein/Projects/incubator-spark/python/pyspark/cloudpickle.py", line > 316, in save_function_tuple > save(closure) > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 286, in save > f(self, obj) # Call unbound method with explicit self > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 600, in save_list > self._batch_appends(iter(obj)) > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 633, in _batch_appends > save(x) > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 286, in save > f(self, obj) # Call unbound method with explicit self > File > "/Users/hossein/Projects/incubator-spark/python/pyspark/cloudpickle.py", line > 259, in save_function > self.save_function_tuple(obj, [themodule]) > File > "/Users/hossein/Projects/incubator-spark/python/pyspark/cloudpickle.py", line > 316, in save_function_tuple > save(closure) > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 286, in save > f(self, obj) # Call unbound method with explicit self > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 600, in save_list > self._batch_appends(iter(obj)) > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 633, in _batch_appends > save(x) > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 286, in save > f(self, obj) # Call unbound method with explicit self > File > "/Users/hossein/Projects/incubator-spark/python/pyspark/cloudpickle.py", line > 259, in save_function > self.save_function_tuple(obj, [themodule]) > File > "/Users/hossein/Projects/incubator-spark/python/pyspark/cloudpickle.py", line > 316, in save_function_tuple > save(closure) > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 286, in save > f(self, obj) # Call unbound method with explicit self > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 600, in save_list > self._batch_appends(iter(obj)) > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 636, in _batch_appends > save(tmp[0]) > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 286, in save > f(self, obj) # Call unbound method with explicit self > File > "/Users/hossein/Projects/incubator-spark/python/pyspark/cloudpickle.py", line > 254, in save_function > self.save_function_tuple(obj, modList) > File > "/Users/hossein/Projects/incubator-spark/python/pyspark/cloudpickle.py", line > 314, in save_function_tuple > save(f_globals) > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 286, in save > f(self, obj) # Call unbound method with explicit self > File > "/Users/hossein/Projects/incubator-spark/python/pyspark/cloudpickle.py", line > 181, in save_dict > pickle.Pickler.save_dict(self, obj) > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 649, in save_dict > self._batch_setitems(obj.iteritems()) > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 686, in _batch_setitems > save(v) > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 331, in save > self.save_reduce(obj=obj, *rv) > File > "/Users/hossein/Projects/incubator-spark/python/pyspark/cloudpickle.py", line > 631, in save_reduce > save(state) > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 286, in save > f(self, obj) # Call unbound method with explicit self > File > "/Users/hossein/Projects/incubator-spark/python/pyspark/cloudpickle.py", line > 181, in save_dict > pickle.Pickler.save_dict(self, obj) > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 649, in save_dict > self._batch_setitems(obj.iteritems()) > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 681, in _batch_setitems > save(v) > File > "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", > line 306, in save > rv = reduce(self.proto) > File "build/bdist.macosx-10.8-intel/egg/py4j/java_gateway.py", line 500, in > __call__ > File "build/bdist.macosx-10.8-intel/egg/py4j/protocol.py", line 304, in > get_return_value > py4j.protocol.Py4JError: An error occurred while calling o37.__getnewargs__. > Trace: > py4j.Py4JException: Method __getnewargs__([]) does not exist > at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) > at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342) > at py4j.Gateway.invoke(Gateway.java:251) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:207) > at java.lang.Thread.run(Thread.java:695) > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)