Hi All,

I am new at this list. Before sending this mail i have searched on archive
but i have not found a solution for me.

i am using spark to process user locations based on RSSI. My spark script
look like this..

text_files = sc.textFile(','.join(files[startime]))

        result = text_files.flatMap(
            lambda x : s3_files_to_records(x, startime) # Load from JSON
converted CSV logs
        ).filter(
            lambda x: x['mac_time'] # Filter empty fusids
        ).map(
            lambda x: (x['mac_time'], x)# Generate K-V store
        ).combineByKey(
            preprocess_create_combinator,
            preprocess_merge_value,
            preprocess_merge_combiners
        )

Using combineByKey i bilaterate/trilaterate to get device positions

so if a do a

        result.collect()

i get something like this:

[
    (u'B4:3A:28:2A:AF:84_1444299600', {'latitude': 43.348926, 'iters': 1,
'longitude': -3.011294, 'error': 388.82514299844314}),
    (u'30:A8:DB:9F:A0:35_1444299600', {'latitude': 43.348926, 'iters': 2,
'longitude': -3.011294, 'error': 61.62463221959518}),
    (u'00:18:DE:94:F2:DF_1444299600', {'latitude': 43.348679, 'iters': 1,
'longitude': -3.010883, 'error': 436.2689859408533}),
    (u'98:03:D8:65:E5:94_1444299600', {'latitude': 43.348722, 'iters': 1,
'longitude': -3.011031, 'error': 346.54077346735033}),
    (u'68:DF:DD:EF:ED:21_1444299600', {'latitude': 43.348722, 'iters': 1,
'longitude': -3.011031, 'error': 436.2689859408533})
]

I want to conver rdd record to object again to the start an other
map/filter/map/combinebykey process from that result so for that i have
creaded a new function that converts every record to a new tuple

def convert_rdd_to_object(rdd):
    geohash = Geohash()

    result_object = rdd[1]
    result_object["mac"] = rdd[0].split("_")[0]
    result_object["timestamp"] = rdd[0].split("_")[1]
    result_object["geohash"] =
str(geohash.encode(result_object["latitude"],result_object["longitude"]))
    return (result_object["mac"], result_object)

the idea was to get something like this:

[
    (u'B4:3A:28:2A:AF:84', {'latitude': 43.348926, 'iters': 1, 'longitude':
-3.011294, 'error': 388.82514299844314, 'mac': 'B4:3A:28:2A:AF:84',
'timestamp': 1444299600, 'geohash': rwqerqwerw}),
    (u'30:A8:DB:9F:A0:35', {'latitude': 43.348926, 'iters': 2, 'longitude':
-3.011294, 'error': 61.62463221959518, 'mac': '30:A8:DB:9F:A0:35',
'timestamp': 1444299600, 'geohash': rwqerqwerw}),
    (u'00:18:DE:94:F2:DF', {'latitude': 43.348679, 'iters': 1, 'longitude':
-3.010883, 'error': 436.2689859408533, 'mac': '00:18:DE:94:F2:DF',
'timestamp': 1444299600, 'geohash': rwqerqwerw}),
    (u'98:03:D8:65:E5:94', {'latitude': 43.348722, 'iters': 1, 'longitude':
-3.011031, 'error': 346.54077346735033, 'mac': '98:03:D8:65:E5:94',
'timestamp': 1444299600, 'geohash': rwqerqwerw}),
    (u'68:DF:DD:EF:ED:21', {'latitude': 43.348722, 'iters': 1, 'longitude':
-3.011031, 'error': 436.2689859408533, 'mac': '68:DF:DD:EF:ED:21',
'timestamp': 1444299600, 'geohash': rwqerqwerw})
]


So mi attempt was to add an other map at the end

       result = text_files.flatMap(
            lambda x : s3_files_to_records(x, startime) # Load from JSON
converted CSV logs
        ).filter(
            lambda x: x['mac_time'] # Filter empty fusids
        ).map(
            lambda x: (x['mac_time'], x)# Generate K-V store
        ).combineByKey(
            preprocess_create_combinator,
            preprocess_merge_value,
            preprocess_merge_combiners
        ).map(
            lambda x : convert_rdd_to_object(x)
        )

        print result.collect()

but i am getting this error:

Traceback (most recent call last):
  File "/home/iker/Workspace/scripts/test_bilaterate_2.py", line 758, in
<module>
    main()
  File "/home/iker/Workspace/scripts/test_bilaterate_2.py", line 754, in
main
    spark_pipeline("/home/iker/Workspace/scripts/1443901500CSV",
"/home/iker/Workspace/scripts/")
  File "/home/iker/Workspace/scripts/test_bilaterate_2.py", line 707, in
spark_pipeline
    print result.collect()
  File
"/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/rdd.py",
line 757, in collect
  File
"/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/rdd.py",
line 2363, in _jrdd
  File
"/home/iker/Workspace//spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/rdd.py",
line 2283, in _prepare_for_python_RDD
  File
"/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/serializers.py",
line 427, in dumps
  File
"/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/cloudpickle.py",
line 622, in dumps
  File
"/home/iker/Workspace/spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/cloudpickle.py",
line 107, in dump
  File "/usr/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python2.7/pickle.py", line 562, in save_tuple
    save(element)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File
"/home/iker/Workspace//spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/cloudpickle.py",
line 199, in save_function
  File
"/home/iker/Workspace//spark-1.4.1-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/cloudpickle.py",
line 236, in save_function_tuple


i do not know if the problems is that i am not understanding how spark
works or what else but i do not see how to make it work and continue
map/filter/reducing the data i several concatenated steps.


Regards,

-- 
[image: Fon] <http://www.fon.com/>Iker Perez de AlbenizSenior R&D Engineer
/ Technical Lead+34 946545843Skype: iker.perez.fonAll information in this
email is confidential <http://corp.fon.com/legal/email-disclaimer>

Reply via email to