Re: Using mongo with PySpark

2014-06-06 Thread Mayur Rustagi
Yes initialization each turn is hard.. you seem to using python. Another
risky thing you can try is to serialize the mongoclient object using any
serializer (like kryo wrappers in python)  pass it on to mappers.. then in
each mapper you'll just have to unserialize it  use it directly... This
may or may not work for you depending on internals of Mongodb client.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Wed, Jun 4, 2014 at 10:27 PM, Samarth Mailinglist 
mailinglistsama...@gmail.com wrote:

 Thanks a lot, sorry for the really late reply! (Didn't have my laptop)

 This is working, but it's dreadfully slow and seems to not run in
 parallel?


 On Mon, May 19, 2014 at 2:54 PM, Nick Pentreath nick.pentre...@gmail.com
 wrote:

 You need to use mapPartitions (or foreachPartition) to instantiate your
 client in each partition as it is not serializable by the pickle library.
 Something like

 def mapper(iter):
 db = MongoClient()['spark_test_db']
 *collec = db['programs']*
 *for val in iter:*
 asc = val.encode('ascii','ignore')
 json = convertToJSON(asc, indexMap)
 yield collec.insert(json)



 def convertToJSON(string, indexMap):
 values = string.strip().split(,)
 json = {}
 for i in range(len(values)):
 json[indexMap[i]] = values[i]
 return json

 *doc_ids = data.mapPartitions(mapper)*




 On Mon, May 19, 2014 at 8:00 AM, Samarth Mailinglist 
 mailinglistsama...@gmail.com wrote:

 db = MongoClient()['spark_test_db']
 *collec = db['programs']*

 def mapper(val):
 asc = val.encode('ascii','ignore')
 json = convertToJSON(asc, indexMap)
 collec.insert(json) # *this is not working*

 def convertToJSON(string, indexMap):
 values = string.strip().split(,)
  json = {}
 for i in range(len(values)):
 json[indexMap[i]] = values[i]
 return json

 *jsons = data.map(mapper)*



 *The last line does the mapping. I am very new to Spark, can you explain
 what explicit serialization, etc is in the context of spark?  The error I
 am getting:*
 *Traceback (most recent call last):  File stdin, line 1, in
 module  File /usr/local/spark-0.9.1/python/pyspark/rdd.py, line 712, in
 saveAsTextFile
 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)  File
 /usr/local/spark-0.9.1/python/pyspark/rdd.py, line 1178, in _jrdd
 pickled_command = CloudPickleSerializer().dumps(command)   File
 /usr/local/spark-0.9.1/python/pyspark/serializers.py, line 275, in dumps
   def dumps(self, obj): return cloudpickle.dumps(obj, 2)  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 801, in dumps
 cp.dump(obj)  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 140, in dump
   return pickle.Pickler.dump(self, obj)  File
 /usr/lib/python2.7/pickle.py, line 224, in dump self.save(obj)  File
 /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call
 unbound method with explicit self  File /usr/lib/python2.7/pickle.py,
 line 548, in save_tuple save(element)  File
 /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call
 unbound method with explicit self  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 259, in
 save_function self.save_function_tuple(obj, [themodule])  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 316, in
 save_function_tuplesave(closure)  File /usr/lib/python2.7/pickle.py,
 line 286, in save f(self, obj) # Call unbound method with explicit
 self  File /usr/lib/python2.7/pickle.py, line 600, in save_list
 self._batch_appends(iter(obj))  File /usr/lib/python2.7/pickle.py, line
 633, in _batch_appends save(x)  File /usr/lib/python2.7/pickle.py,
 line 286, in savef(self, obj) # Call unbound method with explicit self
 File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 259, in
 save_function self.save_function_tuple(obj, [themodule])  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 316, in
 save_function_tuplesave(closure)  File /usr/lib/python2.7/pickle.py,
 line 286, in save f(self, obj) # Call unbound method with explicit
 self  File /usr/lib/python2.7/pickle.py, line 600, in save_list
 self._batch_appends(iter(obj))  File /usr/lib/python2.7/pickle.py, line
 636, in _batch_appends save(tmp[0])  File
 /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call
 unbound method with explicit self  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 254, in
 save_function self.save_function_tuple(obj, modList)  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 314, in
 save_function_tuplesave(f_globals)  File
 /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call
 unbound method with explicit self  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 181, in
 save_dictpickle.Pickler.save_dict(self, obj)   File
 

Re: Using mongo with PySpark

2014-06-04 Thread Samarth Mailinglist
Thanks a lot, sorry for the really late reply! (Didn't have my laptop)

This is working, but it's dreadfully slow and seems to not run in parallel?


On Mon, May 19, 2014 at 2:54 PM, Nick Pentreath nick.pentre...@gmail.com
wrote:

 You need to use mapPartitions (or foreachPartition) to instantiate your
 client in each partition as it is not serializable by the pickle library.
 Something like

 def mapper(iter):
 db = MongoClient()['spark_test_db']
 *collec = db['programs']*
 *for val in iter:*
 asc = val.encode('ascii','ignore')
 json = convertToJSON(asc, indexMap)
 yield collec.insert(json)



 def convertToJSON(string, indexMap):
 values = string.strip().split(,)
 json = {}
 for i in range(len(values)):
 json[indexMap[i]] = values[i]
 return json

 *doc_ids = data.mapPartitions(mapper)*




 On Mon, May 19, 2014 at 8:00 AM, Samarth Mailinglist 
 mailinglistsama...@gmail.com wrote:

 db = MongoClient()['spark_test_db']
 *collec = db['programs']*

 def mapper(val):
 asc = val.encode('ascii','ignore')
 json = convertToJSON(asc, indexMap)
 collec.insert(json) # *this is not working*

 def convertToJSON(string, indexMap):
 values = string.strip().split(,)
  json = {}
 for i in range(len(values)):
 json[indexMap[i]] = values[i]
 return json

 *jsons = data.map(mapper)*



 *The last line does the mapping. I am very new to Spark, can you explain
 what explicit serialization, etc is in the context of spark?  The error I
 am getting:*
 *Traceback (most recent call last):  File stdin, line 1, in module
 File /usr/local/spark-0.9.1/python/pyspark/rdd.py, line 712, in
 saveAsTextFile
 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)  File
 /usr/local/spark-0.9.1/python/pyspark/rdd.py, line 1178, in _jrdd
 pickled_command = CloudPickleSerializer().dumps(command)   File
 /usr/local/spark-0.9.1/python/pyspark/serializers.py, line 275, in dumps
   def dumps(self, obj): return cloudpickle.dumps(obj, 2)  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 801, in dumps
 cp.dump(obj)  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 140, in dump
   return pickle.Pickler.dump(self, obj)  File
 /usr/lib/python2.7/pickle.py, line 224, in dump self.save(obj)  File
 /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call
 unbound method with explicit self  File /usr/lib/python2.7/pickle.py,
 line 548, in save_tuple save(element)  File
 /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call
 unbound method with explicit self  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 259, in
 save_function self.save_function_tuple(obj, [themodule])  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 316, in
 save_function_tuplesave(closure)  File /usr/lib/python2.7/pickle.py,
 line 286, in save f(self, obj) # Call unbound method with explicit
 self  File /usr/lib/python2.7/pickle.py, line 600, in save_list
 self._batch_appends(iter(obj))  File /usr/lib/python2.7/pickle.py, line
 633, in _batch_appends save(x)  File /usr/lib/python2.7/pickle.py,
 line 286, in savef(self, obj) # Call unbound method with explicit self
 File /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 259, in
 save_function self.save_function_tuple(obj, [themodule])  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 316, in
 save_function_tuplesave(closure)  File /usr/lib/python2.7/pickle.py,
 line 286, in save f(self, obj) # Call unbound method with explicit
 self  File /usr/lib/python2.7/pickle.py, line 600, in save_list
 self._batch_appends(iter(obj))  File /usr/lib/python2.7/pickle.py, line
 636, in _batch_appends save(tmp[0])  File
 /usr/lib/python2.7/pickle.py, line 286, in savef(self, obj) # Call
 unbound method with explicit self  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 254, in
 save_function self.save_function_tuple(obj, modList)  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 314, in
 save_function_tuplesave(f_globals)  File
 /usr/lib/python2.7/pickle.py, line 286, in save f(self, obj) # Call
 unbound method with explicit self  File
 /usr/local/spark-0.9.1/python/pyspark/cloudpickle.py, line 181, in
 save_dictpickle.Pickler.save_dict(self, obj)   File
 /usr/lib/python2.7/pickle.py, line 649, in save_dict
 self._batch_setitems(obj.iteritems())  File /usr/lib/python2.7/pickle.py,
 line 681, in _batch_setitems save(v)  File
 /usr/lib/python2.7/pickle.py, line 306, in saverv =
 reduce(self.proto)  File
 /usr/local/lib/python2.7/dist-packages/pymongo/collection.py, line 1489,
 in __call__ self.__name.split(.)[-1])TypeError: 'Collection' object
 is not callable. If you meant to call the '__getnewargs__' method on a
 'Collection' object it is failing because no such method exists. *


 On Sat, May 17, 2014 at 9:30 PM, 

Re: Using mongo with PySpark

2014-05-17 Thread Nicholas Chammas
Where's your driver code (the code interacting with the RDDs)? Are you
getting serialization errors?

2014년 5월 17일 토요일, Samarth Mailinglistmailinglistsama...@gmail.com님이 작성한
메시지:

 Hi all,

 I am trying to store the results of a reduce into mongo.
 I want to share the variable collection in the mappers.


 Here's what I have so far (I'm using pymongo)

 db = MongoClient()['spark_test_db']
 collec = db['programs']
 db = MongoClient()['spark_test_db']
 *collec = db['programs']*

 def mapper(val):
 asc = val.encode('ascii','ignore')
 json = convertToJSON(asc, indexMap)
 collec.insert(json) # *this is not working*

 def convertToJSON(string, indexMap):
 values = string.strip().split(,)
 json = {}
 for i in range(len(values)):
 json[indexMap[i]] = values[i]
 return json

 How do I do this?



Re: Using mongo with PySpark

2014-05-17 Thread Mayur Rustagi
You have to ideally pass the mongoclient object along with your data in the
mapper(python should be try to serialize your mongoclient, but explicit is
better)
if client is serializable then all should end well.. if not then you are
better off using map partition  initilizing the driver in each iteration 
load data of each partition. Thr is a similar discussion in the list in the
past.
Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Sat, May 17, 2014 at 8:58 PM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 Where's your driver code (the code interacting with the RDDs)? Are you
 getting serialization errors?

 2014년 5월 17일 토요일, Samarth Mailinglistmailinglistsama...@gmail.com님이 작성한
 메시지:

 Hi all,

 I am trying to store the results of a reduce into mongo.
 I want to share the variable collection in the mappers.


 Here's what I have so far (I'm using pymongo)

 db = MongoClient()['spark_test_db']
 collec = db['programs']
 db = MongoClient()['spark_test_db']
 *collec = db['programs']*

 def mapper(val):
 asc = val.encode('ascii','ignore')
 json = convertToJSON(asc, indexMap)
 collec.insert(json) # *this is not working*

 def convertToJSON(string, indexMap):
 values = string.strip().split(,)
 json = {}
 for i in range(len(values)):
 json[indexMap[i]] = values[i]
 return json

 How do I do this?