Hi Guys, I used applySchema to store a set of nested dictionaries and lists in a parquet file.
http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkSQL-to-convert-a-collection-of-python-dictionary-of-dictionaries-to-schma-RDD-td20228.html#a20461 It was successful and i could successfully load the data as well.Now im trying to convert this SchemaRDD to a RDD of dictionaries so that I can run some reduces on them. The schema of my RDD is as follows: |-- field1: string (nullable = true) |-- field2: integer (nullable = true) |-- field3: map (nullable = true) | |-- key: integer | |-- value: integer (valueContainsNull = true) |-- field4: map (nullable = true) | |-- key: string | |-- value: integer (valueContainsNull = true) |-- field5: array (nullable = true) | |-- element: string (containsNull = true) |-- field6: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- field61: string (nullable = true) | | |-- field62: string (nullable = true) | | |-- field63: integer (nullable = true) And Im using the following mapper to map these fields to a RDD that I can reduce later. def generateRecords(line): # input : the row stored in parquet file # output : a python dictionary with all the key value pairs field1 = line.field1 summary = {} summary['field2'] = line.field2 summary['field3'] = line.field3 summary['field4'] = line.field4 summary['field5'] = line.field5 summary['field6'] = line.field6 return (guid,summary) profiles = sqc.parquetFile(path) profileRecords = profiles.map(lambda line: generateRecords(line)) This code works perfectly well when field6 is not mapped. IE when you comment out the line that maps field6 in generateRecords. the RDD gets generated perfoectly. Even field 5 gets mapped. The key difference between field 5 and 6 are, field5 is a list of strings and field 6 is a list of tupes in the forma (String, String, Int) . But when you try to map field6, it throws : Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/root/spark/python/pyspark/rdd.py", line 847, in count return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() File "/root/spark/python/pyspark/rdd.py", line 838, in sum return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) File "/root/spark/python/pyspark/rdd.py", line 759, in reduce vals = self.mapPartitions(func).collect() File "/root/spark/python/pyspark/rdd.py", line 723, in collect bytesInJava = self._jrdd.collect().iterator() File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o88.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 32 in stage 3.0 failed 4 times, most recent failure: Lost task 32.3 in stage 3.0 (TID 1829, ip-172-31-18-36.ec2.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/root/spark/python/pyspark/worker.py", line 79, in main serializer.dump_stream(func(split_index, iterator), outfile) File "/root/spark/python/pyspark/serializers.py", line 196, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/root/spark/python/pyspark/serializers.py", line 128, in dump_stream self._write_with_length(obj, stream) File "/root/spark/python/pyspark/serializers.py", line 138, in _write_with_length serialized = self.dumps(obj) File "/root/spark/python/pyspark/serializers.py", line 356, in dumps return cPickle.dumps(obj, 2) PicklingError: Can't pickle <class 'pyspark.sql.List'>: attribute lookup pyspark.sql.List failed Can someone help me to understand what is going wrong here. Many thanks SahanB -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-mapping-a-schema-RDD-when-converting-lists-tp20577.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org