Hi All, I am new at this list. Before sending this mail i have searched on archive but i have not found a solution for me.
i am using spark to process user locations based on RSSI. My spark script look like this.. text_files = sc.textFile(','.join(files[startime])) result = text_files.flatMap( lambda x : s3_files_to_records(x, startime) # Load from JSON converted CSV logs ).filter( lambda x: x['mac_time'] # Filter empty fusids ).map( lambda x: (x['mac_time'], x)# Generate K-V store ).combineByKey( preprocess_create_combinator, preprocess_merge_value, preprocess_merge_combiners ) Using combineByKey i bilaterate/trilaterate to get device positions so if a do a result.collect() i get something like this: [ (u'B4:3A:28:2A:AF:84_1444299600', {'latitude': 43.348926, 'iters': 1, 'longitude': -3.011294, 'error': 388.82514299844314}), (u'30:A8:DB:9F:A0:35_1444299600', {'latitude': 43.348926, 'iters': 2, 'longitude': -3.011294, 'error': 61.62463221959518}), (u'00:18:DE:94:F2:DF_1444299600', {'latitude': 43.348679, 'iters': 1, 'longitude': -3.010883, 'error': 436.2689859408533}), (u'98:03:D8:65:E5:94_1444299600', {'latitude': 43.348722, 'iters': 1, 'longitude': -3.011031, 'error': 346.54077346735033}), (u'68:DF:DD:EF:ED:21_1444299600', {'latitude': 43.348722, 'iters': 1, 'longitude': -3.011031, 'error': 436.2689859408533}) ] I want to conver rdd record to object again to the start an other map/filter/map/combinebykey process from that result so for that i have creaded a new function that converts every record to a new tuple def convert_rdd_to_object(rdd): geohash = Geohash() result_object = rdd[1] result_object["mac"] = rdd[0].split("_")[0] result_object["timestamp"] = rdd[0].split("_")[1] result_object["geohash"] = str(geohash.encode(result_object["latitude"],result_object["longitude"])) return (result_object["mac"], result_object) the idea was to get something like this: [ (u'B4:3A:28:2A:AF:84', {'latitude': 43.348926, 'iters': 1, 'longitude': -3.011294, 'error': 388.82514299844314, 'mac': 'B4:3A:28:2A:AF:84', 'timestamp': 1444299600, 'geohash': rwqerqwerw}), (u'30:A8:DB:9F:A0:35', {'latitude': 43.348926, 'iters': 2, 'longitude': -3.011294, 'error': 61.62463221959518, 'mac': '30:A8:DB:9F:A0:35', 'timestamp': 1444299600, 'geohash': rwqerqwerw}), (u'00:18:DE:94:F2:DF', {'latitude': 43.348679, 'iters': 1, 'longitude': -3.010883, 'error': 436.2689859408533, 'mac': '00:18:DE:94:F2:DF', 'timestamp': 1444299600, 'geohash': rwqerqwerw}), (u'98:03:D8:65:E5:94', {'latitude': 43.348722, 'iters': 1, 'longitude': -3.011031, 'error': 346.54077346735033, 'mac': '98:03:D8:65:E5:94', 'timestamp': 1444299600, 'geohash': rwqerqwerw}), (u'68:DF:DD:EF:ED:21', {'latitude': 43.348722, 'iters': 1, 'longitude': -3.011031, 'error': 436.2689859408533, 'mac': '68:DF:DD:EF:ED:21', 'timestamp': 1444299600, 'geohash': rwqerqwerw}) ] So mi attempt was to add an other map at the end result = text_files.flatMap( lambda x : s3_files_to_records(x, startime) # Load from JSON converted CSV logs ).filter( lambda x: x['mac_time'] # Filter empty fusids ).map( lambda x: (x['mac_time'], x)# Generate K-V store ).combineByKey( preprocess_create_combinator, preprocess_merge_value, preprocess_merge_combiners ).map( lambda x : convert_rdd_to_object(x) ) print result.collect() but i am getting this error: Traceback (most recent call last): File "/home/iker/Workspace/scripts/test_bilaterate_2.py", line 758, in <module> main() File "/home/iker/Workspace/scripts/test_bilaterate_2.py", line 754, in main spark_pipeline("/home/iker/Workspace/scripts/1443901500CSV", "/home/iker/Workspace/scripts/") File "/home/iker/Workspace/scripts/test_bilaterate_2.py", line 707, in spark_pipeline print result.collect() File "/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/rdd.py", line 757, in collect File "/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/rdd.py", line 2363, in _jrdd File "/home/iker/Workspace//spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/rdd.py", line 2283, in _prepare_for_python_RDD File "/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/serializers.py", line 427, in dumps File "/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 622, in dumps File "/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 107, in dump File "/usr/lib/python2.7/pickle.py", line 224, in dump self.save(obj) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/python2.7/pickle.py", line 562, in save_tuple save(element) File "/usr/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/home/iker/Workspace//spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 199, in save_function File "/home/iker/Workspace//spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 236, in save_function_tuple i do not know if the problems is that i am not understanding how spark works or what else but i do not see how to make it work and continue map/filter/reducing the data i several concatenated steps. Regards, -- [image: Fon] <http://www.fon.com/>Iker Perez de AlbenizSenior R&D Engineer / Technical Lead+34 946545843Skype: iker.perez.fonAll information in this email is confidential <http://corp.fon.com/legal/email-disclaimer>