Re: Using mongo with PySpark
Yes initialization each turn is hard.. you seem to using python. Another risky thing you can try is to serialize the mongoclient object using any serializer (like kryo wrappers in python) pass it on to mappers.. then in each mapper you'll just have to unserialize it use it directly... This may or may not work for you depending on internals of Mongodb client. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Jun 4, 2014 at 10:27 PM, Samarth Mailinglist mailinglistsama...@gmail.com wrote: Thanks a lot, sorry for the really late reply! (Didn't have my laptop) This is working, but it's dreadfully slow and seems to not run in parallel? On Mon, May 19, 2014 at 2:54 PM, Nick Pentreath nick.pentre...@gmail.com wrote: You need to use mapPartitions (or foreachPartition) to instantiate your client in each partition as it is not serializable by the pickle library. Something like def mapper(iter): db = MongoClient()['spark_test_db'] *collec = db['programs']* *for val in iter:* asc = val.encode('ascii','ignore') json = convertToJSON(asc, indexMap) yield collec.insert(json) def convertToJSON(string, indexMap): values = string.strip().split(,) json = {} for i in range(len(values)): json[indexMap[i]] = values[i] return json *doc_ids = data.mapPartitions(mapper)* On Mon, May 19, 2014 at 8:00 AM, Samarth Mailinglist mailinglistsama...@gmail.com wrote: db = MongoClient()['spark_test_db'] *collec = db['programs']* def mapper(val): asc = val.encode('ascii','ignore') json = convertToJSON(asc, indexMap) collec.insert(json) # *this is not working* def convertToJSON(string, indexMap): values = string.strip().split(,) json = {} for i in range(len(values)): json[indexMap[i]] = values[i] return json *jsons = data.map(mapper)* *The last line does the mapping. I am very new to Spark, can you explain what explicit serialization, etc is in the context of spark? The error I am getting:* *Traceback (most recent call last): File stdin, line 1, in module File /usr/local/spark-0.9.1/python/pyspark/rdd.py, line 712, in saveAsTextFile keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) File /usr/local/spark-0.9.1/python/pyspark/rdd.py, line 1178, in _jrdd pickled_command = CloudPickleSerializer().dumps(command) File /usr/local/spark-0.9.1/python/pyspark/serializers.py, line 275, in dumps def dumps(self, obj): return cloudpickle.dumps(obj, 2) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 801, in dumps cp.dump(obj) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 140, in dump return pickle.Pickler.dump(self, obj) File /usr/lib/python2.7/pickle.py, line 224, in dump self.save(obj) File /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 548, in save_tuple save(element) File /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call unbound method with explicit self File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 259, in save_function self.save_function_tuple(obj, [themodule]) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 316, in save_function_tuplesave(closure) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 600, in save_list self._batch_appends(iter(obj)) File /usr/lib/python2.7/pickle.py, line 633, in _batch_appends save(x) File /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call unbound method with explicit self File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 259, in save_function self.save_function_tuple(obj, [themodule]) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 316, in save_function_tuplesave(closure) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 600, in save_list self._batch_appends(iter(obj)) File /usr/lib/python2.7/pickle.py, line 636, in _batch_appends save(tmp[0]) File /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call unbound method with explicit self File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 254, in save_function self.save_function_tuple(obj, modList) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 314, in save_function_tuplesave(f_globals) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 181, in save_dictpickle.Pickler.save_dict(self, obj) File
Re: Using mongo with PySpark
Thanks a lot, sorry for the really late reply! (Didn't have my laptop) This is working, but it's dreadfully slow and seems to not run in parallel? On Mon, May 19, 2014 at 2:54 PM, Nick Pentreath nick.pentre...@gmail.com wrote: You need to use mapPartitions (or foreachPartition) to instantiate your client in each partition as it is not serializable by the pickle library. Something like def mapper(iter): db = MongoClient()['spark_test_db'] *collec = db['programs']* *for val in iter:* asc = val.encode('ascii','ignore') json = convertToJSON(asc, indexMap) yield collec.insert(json) def convertToJSON(string, indexMap): values = string.strip().split(,) json = {} for i in range(len(values)): json[indexMap[i]] = values[i] return json *doc_ids = data.mapPartitions(mapper)* On Mon, May 19, 2014 at 8:00 AM, Samarth Mailinglist mailinglistsama...@gmail.com wrote: db = MongoClient()['spark_test_db'] *collec = db['programs']* def mapper(val): asc = val.encode('ascii','ignore') json = convertToJSON(asc, indexMap) collec.insert(json) # *this is not working* def convertToJSON(string, indexMap): values = string.strip().split(,) json = {} for i in range(len(values)): json[indexMap[i]] = values[i] return json *jsons = data.map(mapper)* *The last line does the mapping. I am very new to Spark, can you explain what explicit serialization, etc is in the context of spark? The error I am getting:* *Traceback (most recent call last): File stdin, line 1, in module File /usr/local/spark-0.9.1/python/pyspark/rdd.py, line 712, in saveAsTextFile keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) File /usr/local/spark-0.9.1/python/pyspark/rdd.py, line 1178, in _jrdd pickled_command = CloudPickleSerializer().dumps(command) File /usr/local/spark-0.9.1/python/pyspark/serializers.py, line 275, in dumps def dumps(self, obj): return cloudpickle.dumps(obj, 2) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 801, in dumps cp.dump(obj) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 140, in dump return pickle.Pickler.dump(self, obj) File /usr/lib/python2.7/pickle.py, line 224, in dump self.save(obj) File /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 548, in save_tuple save(element) File /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call unbound method with explicit self File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 259, in save_function self.save_function_tuple(obj, [themodule]) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 316, in save_function_tuplesave(closure) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 600, in save_list self._batch_appends(iter(obj)) File /usr/lib/python2.7/pickle.py, line 633, in _batch_appends save(x) File /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call unbound method with explicit self File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 259, in save_function self.save_function_tuple(obj, [themodule]) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 316, in save_function_tuplesave(closure) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/lib/python2.7/pickle.py, line 600, in save_list self._batch_appends(iter(obj)) File /usr/lib/python2.7/pickle.py, line 636, in _batch_appends save(tmp[0]) File /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call unbound method with explicit self File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 254, in save_function self.save_function_tuple(obj, modList) File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 314, in save_function_tuplesave(f_globals) File /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call unbound method with explicit self File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 181, in save_dictpickle.Pickler.save_dict(self, obj) File /usr/lib/python2.7/pickle.py, line 649, in save_dict self._batch_setitems(obj.iteritems()) File /usr/lib/python2.7/pickle.py, line 681, in _batch_setitems save(v) File /usr/lib/python2.7/pickle.py, line 306, in saverv = reduce(self.proto) File /usr/local/lib/python2.7/dist-packages/pymongo/collection.py, line 1489, in __call__ self.__name.split(.)[-1])TypeError: 'Collection' object is not callable. If you meant to call the '__getnewargs__' method on a 'Collection' object it is failing because no such method exists. * On Sat, May 17, 2014 at 9:30 PM,
Re: Using mongo with PySpark
Where's your driver code (the code interacting with the RDDs)? Are you getting serialization errors? 2014년 5월 17일 토요일, Samarth Mailinglistmailinglistsama...@gmail.com님이 작성한 메시지: Hi all, I am trying to store the results of a reduce into mongo. I want to share the variable collection in the mappers. Here's what I have so far (I'm using pymongo) db = MongoClient()['spark_test_db'] collec = db['programs'] db = MongoClient()['spark_test_db'] *collec = db['programs']* def mapper(val): asc = val.encode('ascii','ignore') json = convertToJSON(asc, indexMap) collec.insert(json) # *this is not working* def convertToJSON(string, indexMap): values = string.strip().split(,) json = {} for i in range(len(values)): json[indexMap[i]] = values[i] return json How do I do this?
Re: Using mongo with PySpark
You have to ideally pass the mongoclient object along with your data in the mapper(python should be try to serialize your mongoclient, but explicit is better) if client is serializable then all should end well.. if not then you are better off using map partition initilizing the driver in each iteration load data of each partition. Thr is a similar discussion in the list in the past. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Sat, May 17, 2014 at 8:58 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Where's your driver code (the code interacting with the RDDs)? Are you getting serialization errors? 2014년 5월 17일 토요일, Samarth Mailinglistmailinglistsama...@gmail.com님이 작성한 메시지: Hi all, I am trying to store the results of a reduce into mongo. I want to share the variable collection in the mappers. Here's what I have so far (I'm using pymongo) db = MongoClient()['spark_test_db'] collec = db['programs'] db = MongoClient()['spark_test_db'] *collec = db['programs']* def mapper(val): asc = val.encode('ascii','ignore') json = convertToJSON(asc, indexMap) collec.insert(json) # *this is not working* def convertToJSON(string, indexMap): values = string.strip().split(,) json = {} for i in range(len(values)): json[indexMap[i]] = values[i] return json How do I do this?