db = MongoClient()['spark_test_db'] *collec = db['programs']* def mapper(val): asc = val.encode('ascii','ignore') json = convertToJSON(asc, indexMap) collec.insert(json) # *this is not working*
def convertToJSON(string, indexMap): values = string.strip().split(",") json = {} for i in range(len(values)): json[indexMap[i]] = values[i] return json *jsons = data.map(mapper)* *The last line does the mapping. I am very new to Spark, can you explain what explicit serialization, etc is in the context of spark? The error I am getting:* *Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/spark-0.9.1/python/pyspark/rdd.py", line 712, in saveAsTextFile keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) File "/usr/local/spark-0.9.1/python/pyspark/rdd.py", line 1178, in _jrdd pickled_command = CloudPickleSerializer().dumps(command) File "/usr/local/spark-0.9.1/python/pyspark/serializers.py", line 275, in dumps def dumps(self, obj): return cloudpickle.dumps(obj, 2) File "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 801, in dumps cp.dump(obj) File "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 140, in dump return pickle.Pickler.dump(self, obj) File "/usr/lib/python2.7/pickle.py", line 224, in dump self.save(obj) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 548, in save_tuple save(element) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 259, in save_function self.save_function_tuple(obj, [themodule]) File "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 316, in save_function_tuple save(closure) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 600, in save_list self._batch_appends(iter(obj)) File "/usr/lib/python2.7/pickle.py", line 633, in _batch_appends save(x) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 259, in save_function self.save_function_tuple(obj, [themodule]) File "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 316, in save_function_tuple save(closure) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 600, in save_list self._batch_appends(iter(obj)) File "/usr/lib/python2.7/pickle.py", line 636, in _batch_appends save(tmp[0]) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 254, in save_function self.save_function_tuple(obj, modList) File "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 314, in save_function_tuple save(f_globals) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 181, in save_dict pickle.Pickler.save_dict(self, obj) File "/usr/lib/python2.7/pickle.py", line 649, in save_dict self._batch_setitems(obj.iteritems()) File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems save(v) File "/usr/lib/python2.7/pickle.py", line 306, in save rv = reduce(self.proto) File "/usr/local/lib/python2.7/dist-packages/pymongo/collection.py", line 1489, in __call__ self.__name.split(".")[-1])TypeError: 'Collection' object is not callable. If you meant to call the '__getnewargs__' method on a 'Collection' object it is failing because no such method exists.* On Sat, May 17, 2014 at 9:30 PM, Mayur Rustagi <mayur.rust...@gmail.com>wrote: > You have to ideally pass the mongoclient object along with your data in > the mapper(python should be try to serialize your mongoclient, but explicit > is better).... > if client is serializable then all should end well.. if not then you are > better off using map partition & initilizing the driver in each iteration & > load data of each partition. Thr is a similar discussion in the list in the > past. > Regards > Mayur > > Mayur Rustagi > Ph: +1 (760) 203 3257 > http://www.sigmoidanalytics.com > @mayur_rustagi <https://twitter.com/mayur_rustagi> > > > > On Sat, May 17, 2014 at 8:58 PM, Nicholas Chammas < > nicholas.cham...@gmail.com> wrote: > >> Where's your driver code (the code interacting with the RDDs)? Are you >> getting serialization errors? >> >> 2014년 5월 17일 토요일, Samarth Mailinglist<mailinglistsama...@gmail.com>님이 >> 작성한 메시지: >> >> Hi all, >>> >>> I am trying to store the results of a reduce into mongo. >>> I want to share the variable "collection" in the mappers. >>> >>> >>> Here's what I have so far (I'm using pymongo) >>> >>> db = MongoClient()['spark_test_db'] >>> collec = db['programs'] >>> db = MongoClient()['spark_test_db'] >>> *collec = db['programs']* >>> >>> def mapper(val): >>> asc = val.encode('ascii','ignore') >>> json = convertToJSON(asc, indexMap) >>> collec.insert(json) # *this is not working* >>> >>> def convertToJSON(string, indexMap): >>> values = string.strip().split(",") >>> json = {} >>> for i in range(len(values)): >>> json[indexMap[i]] = values[i] >>> return json >>> >>> How do I do this? >>> >> >