btw, they can't be saved to BSON either. This seems a generic issue, can anyone else reproduce this?
On Mon, Mar 28, 2016 at 8:02 PM, Russell Jurney <russell.jur...@gmail.com> wrote: > I created a JIRA: https://issues.apache.org/jira/browse/SPARK-14229 > > On Mon, Mar 28, 2016 at 7:43 PM, Russell Jurney <russell.jur...@gmail.com> > wrote: > >> Ted, I am using the .rdd method, see above, but for some reason these >> RDDs can't be saved to MongoDB or ElasticSearch. >> >> I think this is a bug in PySpark/DataFrame. I can't think of another >> explanation... somehow DataFrame.rdd RDDs are not able to be stored to an >> arbitrary Hadoop OutputFormat. When I do this: >> >> on_time_lines = >> sc.textFile("../data/On_Time_On_Time_Performance_2015.jsonl.gz") >> on_time_performance = on_time_lines.map(lambda x: json.loads(x)) >> >> >> on_time_performance.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance') >> >> >> It works. Same data, but loaded as textFile instead of DataFrame (via >> json/parquet dataframe loading). >> >> It is the DataFrame.rdd bit that is broken. I will file a JIRA. >> >> Does anyone know a workaround? >> >> On Mon, Mar 28, 2016 at 7:28 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> See this method: >>> >>> lazy val rdd: RDD[T] = { >>> >>> On Mon, Mar 28, 2016 at 6:30 PM, Russell Jurney < >>> russell.jur...@gmail.com> wrote: >>> >>>> Ok, I'm also unable to save to Elasticsearch using a dataframe's RDD. >>>> This seems related to DataFrames. Is there a way to convert a DataFrame's >>>> RDD to a 'normal' RDD? >>>> >>>> >>>> On Mon, Mar 28, 2016 at 6:20 PM, Russell Jurney < >>>> russell.jur...@gmail.com> wrote: >>>> >>>>> I filed a JIRA <https://jira.mongodb.org/browse/HADOOP-276> in the >>>>> mongo-hadoop project, but I'm curious if anyone else has seen this issue. >>>>> Anyone have any idea what to do? I can't save to Mongo from PySpark. A >>>>> contrived example works, but a dataframe does not. >>>>> >>>>> I activate pymongo_spark and load a dataframe: >>>>> >>>>> import pymongo >>>>> import pymongo_spark >>>>> # Important: activate pymongo_spark. >>>>> pymongo_spark.activate() >>>>> >>>>> on_time_dataframe = >>>>> sqlContext.read.parquet('../data/on_time_performance.parquet') >>>>> >>>>> Then I try saving to MongoDB in two ways: >>>>> >>>>> >>>>> on_time_dataframe.rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance') >>>>> >>>>> on_time_dataframe.rdd.saveAsNewAPIHadoopFile( >>>>> path='file://unused', >>>>> outputFormatClass='com.mongodb.hadoop.MongoOutputFormat', >>>>> keyClass='org.apache.hadoop.io.Text', >>>>> valueClass='org.apache.hadoop.io.MapWritable', >>>>> conf={"mongo.output.uri": >>>>> "mongodb://localhost:27017/agile_data_science.on_time_performance"} >>>>> ) >>>>> >>>>> >>>>> But I always get this error: >>>>> >>>>> In [7]: >>>>> on_time_rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance') >>>>> >>>>> 16/03/28 18:04:06 INFO mapred.FileInputFormat: Total input paths to >>>>> process : 1 >>>>> >>>>> 16/03/28 18:04:06 INFO spark.SparkContext: Starting job: runJob at >>>>> PythonRDD.scala:393 >>>>> >>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Got job 2 (runJob at >>>>> PythonRDD.scala:393) with 1 output partitions >>>>> >>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Final stage: >>>>> ResultStage 2 (runJob at PythonRDD.scala:393) >>>>> >>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Parents of final stage: >>>>> List() >>>>> >>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Missing parents: List() >>>>> >>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting ResultStage >>>>> 2 (PythonRDD[13] at RDD at PythonRDD.scala:43), which has no missing >>>>> parents >>>>> >>>>> 16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5 stored >>>>> as values in memory (estimated size 19.3 KB, free 249.2 KB) >>>>> >>>>> 16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5_piece0 >>>>> stored as bytes in memory (estimated size 9.7 KB, free 258.9 KB) >>>>> >>>>> 16/03/28 18:04:06 INFO storage.BlockManagerInfo: Added >>>>> broadcast_5_piece0 in memory on localhost:59881 (size: 9.7 KB, free: 511.1 >>>>> MB) >>>>> >>>>> 16/03/28 18:04:06 INFO spark.SparkContext: Created broadcast 5 from >>>>> broadcast at DAGScheduler.scala:1006 >>>>> >>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting 1 missing >>>>> tasks from ResultStage 2 (PythonRDD[13] at RDD at PythonRDD.scala:43) >>>>> >>>>> 16/03/28 18:04:06 INFO scheduler.TaskSchedulerImpl: Adding task set >>>>> 2.0 with 1 tasks >>>>> >>>>> 16/03/28 18:04:06 INFO scheduler.TaskSetManager: Starting task 0.0 in >>>>> stage 2.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 2666 bytes) >>>>> >>>>> 16/03/28 18:04:06 INFO executor.Executor: Running task 0.0 in stage >>>>> 2.0 (TID 2) >>>>> >>>>> 16/03/28 18:04:06 INFO rdd.HadoopRDD: Input split: >>>>> file:/Users/rjurney/Software/Agile_Data_Code_2/data/On_Time_On_Time_Performance_2015.csv.gz:0+312456777 >>>>> >>>>> 16/03/28 18:04:06 INFO compress.CodecPool: Got brand-new decompressor >>>>> [.gz] >>>>> >>>>> 16/03/28 18:04:07 INFO python.PythonRunner: Times: total = 1310, boot >>>>> = 1249, init = 58, finish = 3 >>>>> >>>>> 16/03/28 18:04:07 INFO executor.Executor: Finished task 0.0 in stage >>>>> 2.0 (TID 2). 4475 bytes result sent to driver >>>>> >>>>> 16/03/28 18:04:07 INFO scheduler.TaskSetManager: Finished task 0.0 in >>>>> stage 2.0 (TID 2) in 1345 ms on localhost (1/1) >>>>> >>>>> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Removed TaskSet >>>>> 2.0, whose tasks have all completed, from pool >>>>> >>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: ResultStage 2 (runJob >>>>> at PythonRDD.scala:393) finished in 1.346 s >>>>> >>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Job 2 finished: runJob >>>>> at PythonRDD.scala:393, took 1.361003 s >>>>> >>>>> 16/03/28 18:04:07 INFO spark.SparkContext: Starting job: take at >>>>> SerDeUtil.scala:231 >>>>> >>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Got job 3 (take at >>>>> SerDeUtil.scala:231) with 1 output partitions >>>>> >>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Final stage: >>>>> ResultStage 3 (take at SerDeUtil.scala:231) >>>>> >>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Parents of final stage: >>>>> List() >>>>> >>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Missing parents: List() >>>>> >>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Submitting ResultStage >>>>> 3 (MapPartitionsRDD[15] at mapPartitions at SerDeUtil.scala:146), which >>>>> has >>>>> no missing parents >>>>> >>>>> 16/03/28 18:04:07 INFO storage.MemoryStore: Block broadcast_6 stored >>>>> as values in memory (estimated size 19.6 KB, free 278.4 KB) >>>>> >>>>> 16/03/28 18:04:07 INFO storage.MemoryStore: Block broadcast_6_piece0 >>>>> stored as bytes in memory (estimated size 9.8 KB, free 288.2 KB) >>>>> >>>>> 16/03/28 18:04:07 INFO storage.BlockManagerInfo: Added >>>>> broadcast_6_piece0 in memory on localhost:59881 (size: 9.8 KB, free: 511.1 >>>>> MB) >>>>> >>>>> 16/03/28 18:04:07 INFO spark.SparkContext: Created broadcast 6 from >>>>> broadcast at DAGScheduler.scala:1006 >>>>> >>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Submitting 1 missing >>>>> tasks from ResultStage 3 (MapPartitionsRDD[15] at mapPartitions at >>>>> SerDeUtil.scala:146) >>>>> >>>>> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Adding task set >>>>> 3.0 with 1 tasks >>>>> >>>>> 16/03/28 18:04:07 INFO scheduler.TaskSetManager: Starting task 0.0 in >>>>> stage 3.0 (TID 3, localhost, partition 0,PROCESS_LOCAL, 2666 bytes) >>>>> >>>>> 16/03/28 18:04:07 INFO executor.Executor: Running task 0.0 in stage >>>>> 3.0 (TID 3) >>>>> >>>>> 16/03/28 18:04:07 INFO rdd.HadoopRDD: Input split: >>>>> file:/Users/rjurney/Software/Agile_Data_Code_2/data/On_Time_On_Time_Performance_2015.csv.gz:0+312456777 >>>>> >>>>> 16/03/28 18:04:07 INFO compress.CodecPool: Got brand-new decompressor >>>>> [.gz] >>>>> >>>>> 16/03/28 18:04:07 ERROR executor.Executor: Exception in task 0.0 in >>>>> stage 3.0 (TID 3) >>>>> >>>>> net.razorvine.pickle.PickleException: expected zero arguments for >>>>> construction of ClassDict (for pyspark.sql.types._create_row) >>>>> >>>>> at >>>>> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) >>>>> >>>>> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707) >>>>> >>>>> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175) >>>>> >>>>> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99) >>>>> >>>>> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112) >>>>> >>>>> at >>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:150) >>>>> >>>>> at >>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:149) >>>>> >>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>> >>>>> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) >>>>> >>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>>> >>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>>>> >>>>> at >>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >>>>> >>>>> at >>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >>>>> >>>>> at >>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >>>>> >>>>> at scala.collection.TraversableOnce$class.to >>>>> (TraversableOnce.scala:273) >>>>> >>>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157) >>>>> >>>>> at >>>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >>>>> >>>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) >>>>> >>>>> at >>>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >>>>> >>>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >>>>> >>>>> at >>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >>>>> >>>>> at >>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >>>>> >>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>>>> >>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>>>> >>>>> at >>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>>>> >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> 16/03/28 18:04:07 WARN scheduler.TaskSetManager: Lost task 0.0 in >>>>> stage 3.0 (TID 3, localhost): net.razorvine.pickle.PickleException: >>>>> expected zero arguments for construction of ClassDict (for >>>>> pyspark.sql.types._create_row) >>>>> >>>>> at >>>>> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) >>>>> >>>>> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707) >>>>> >>>>> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175) >>>>> >>>>> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99) >>>>> >>>>> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112) >>>>> >>>>> at >>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:150) >>>>> >>>>> at >>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:149) >>>>> >>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>> >>>>> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) >>>>> >>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>>> >>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>>>> >>>>> at >>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >>>>> >>>>> at >>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >>>>> >>>>> at >>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >>>>> >>>>> at scala.collection.TraversableOnce$class.to >>>>> (TraversableOnce.scala:273) >>>>> >>>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157) >>>>> >>>>> at >>>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >>>>> >>>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) >>>>> >>>>> at >>>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >>>>> >>>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >>>>> >>>>> at >>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >>>>> >>>>> at >>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >>>>> >>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>>>> >>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>>>> >>>>> at >>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>>>> >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> >>>>> 16/03/28 18:04:07 ERROR scheduler.TaskSetManager: Task 0 in stage 3.0 >>>>> failed 1 times; aborting job >>>>> >>>>> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Removed TaskSet >>>>> 3.0, whose tasks have all completed, from pool >>>>> >>>>> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Cancelling stage 3 >>>>> >>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: ResultStage 3 (take at >>>>> SerDeUtil.scala:231) failed in 0.117 s >>>>> >>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Job 3 failed: take at >>>>> SerDeUtil.scala:231, took 0.134593 s >>>>> >>>>> >>>>> --------------------------------------------------------------------------- >>>>> >>>>> Py4JJavaError Traceback (most recent call >>>>> last) >>>>> >>>>> <ipython-input-7-d1f984f17e27> in <module>() >>>>> >>>>> ----> 1 on_time_rdd.saveToMongoDB >>>>> ('mongodb://localhost:27017/agile_data_science.on_time_performance') >>>>> >>>>> >>>>> /Users/rjurney/Software/Agile_Data_Code_2/lib/pymongo_spark.pyc in >>>>> saveToMongoDB(self, connection_string, config) >>>>> >>>>> 104 keyConverter='com.mongodb.spark.pickle.NoopConverter', >>>>> >>>>> 105 valueConverter >>>>> ='com.mongodb.spark.pickle.NoopConverter', >>>>> >>>>> --> 106 conf=conf) >>>>> >>>>> 107 >>>>> >>>>> 108 >>>>> >>>>> >>>>> /Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/rdd.pyc >>>>> in saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass, >>>>> valueClass, keyConverter, valueConverter, conf) >>>>> >>>>> 1372 >>>>> outputFormatClass, >>>>> >>>>> 1373 >>>>> keyClass, valueClass, >>>>> >>>>> -> 1374 >>>>> keyConverter, valueConverter, jconf) >>>>> >>>>> 1375 >>>>> >>>>> 1376 def saveAsHadoopDataset(self, conf, keyConverter=None, >>>>> valueConverter=None): >>>>> >>>>> >>>>> >>>>> /Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py >>>>> in __call__(self, *args) >>>>> >>>>> 811 answer = self.gateway_client.send_command(command) >>>>> >>>>> 812 return_value = get_return_value( >>>>> >>>>> --> 813 answer, self.gateway_client, self.target_id, >>>>> self.name) >>>>> >>>>> 814 >>>>> >>>>> 815 for temp_arg in temp_args: >>>>> >>>>> >>>>> >>>>> /Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/sql/utils.pyc >>>>> in deco(*a, **kw) >>>>> >>>>> 43 def deco(*a, **kw): >>>>> >>>>> 44 try: >>>>> >>>>> ---> 45 return f(*a, **kw) >>>>> >>>>> 46 except py4j.protocol.Py4JJavaError as e: >>>>> >>>>> 47 s = e.java_exception.toString() >>>>> >>>>> >>>>> >>>>> /Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py >>>>> in get_return_value(answer, gateway_client, target_id, name) >>>>> >>>>> 306 raise Py4JJavaError( >>>>> >>>>> 307 "An error occurred while calling >>>>> {0}{1}{2}.\n". >>>>> >>>>> --> 308 format(target_id, ".", name), value) >>>>> >>>>> 309 else: >>>>> >>>>> 310 raise Py4JError( >>>>> >>>>> >>>>> Py4JJavaError: An error occurred while calling >>>>> z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile. >>>>> >>>>> : org.apache.spark.SparkException: Job aborted due to stage failure: >>>>> Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in >>>>> stage 3.0 (TID 3, localhost): net.razorvine.pickle.PickleException: >>>>> expected zero arguments for construction of ClassDict (for >>>>> pyspark.sql.types._create_row) >>>>> >>>>> at >>>>> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) >>>>> >>>>> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707) >>>>> >>>>> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175) >>>>> >>>>> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99) >>>>> >>>>> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112) >>>>> >>>>> at >>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:150) >>>>> >>>>> at >>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:149) >>>>> >>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>> >>>>> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) >>>>> >>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>>> >>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>>>> >>>>> at >>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >>>>> >>>>> at >>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >>>>> >>>>> at >>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >>>>> >>>>> at scala.collection.TraversableOnce$class.to >>>>> (TraversableOnce.scala:273) >>>>> >>>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157) >>>>> >>>>> at >>>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >>>>> >>>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) >>>>> >>>>> at >>>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >>>>> >>>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >>>>> >>>>> at >>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >>>>> >>>>> at >>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >>>>> >>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>>>> >>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>>>> >>>>> at >>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>>>> >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> >>>>> Driver stacktrace: >>>>> >>>>> at org.apache.spark.scheduler.DAGScheduler.org >>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) >>>>> >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) >>>>> >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) >>>>> >>>>> at >>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>> >>>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>> >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) >>>>> >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) >>>>> >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) >>>>> >>>>> at scala.Option.foreach(Option.scala:236) >>>>> >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) >>>>> >>>>> at >>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) >>>>> >>>>> at >>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) >>>>> >>>>> at >>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) >>>>> >>>>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>>> >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) >>>>> >>>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) >>>>> >>>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) >>>>> >>>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) >>>>> >>>>> at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1328) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) >>>>> >>>>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) >>>>> >>>>> at org.apache.spark.rdd.RDD.take(RDD.scala:1302) >>>>> >>>>> at >>>>> org.apache.spark.api.python.SerDeUtil$.pythonToPairRDD(SerDeUtil.scala:231) >>>>> >>>>> at >>>>> org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:775) >>>>> >>>>> at >>>>> org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala) >>>>> >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> >>>>> at >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> >>>>> at java.lang.reflect.Method.invoke(Method.java:497) >>>>> >>>>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) >>>>> >>>>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) >>>>> >>>>> at py4j.Gateway.invoke(Gateway.java:259) >>>>> >>>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) >>>>> >>>>> at py4j.commands.CallCommand.execute(CallCommand.java:79) >>>>> >>>>> at py4j.GatewayConnection.run(GatewayConnection.java:209) >>>>> >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> Caused by: net.razorvine.pickle.PickleException: expected zero >>>>> arguments for construction of ClassDict (for >>>>> pyspark.sql.types._create_row) >>>>> >>>>> at >>>>> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) >>>>> >>>>> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707) >>>>> >>>>> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175) >>>>> >>>>> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99) >>>>> >>>>> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112) >>>>> >>>>> at >>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:150) >>>>> >>>>> at >>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:149) >>>>> >>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>> >>>>> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) >>>>> >>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>>> >>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>>>> >>>>> at >>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >>>>> >>>>> at >>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >>>>> >>>>> at >>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >>>>> >>>>> at scala.collection.TraversableOnce$class.to >>>>> (TraversableOnce.scala:273) >>>>> >>>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157) >>>>> >>>>> at >>>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >>>>> >>>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) >>>>> >>>>> at >>>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >>>>> >>>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >>>>> >>>>> at >>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >>>>> >>>>> at >>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >>>>> >>>>> at >>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >>>>> >>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>>>> >>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>>>> >>>>> at >>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>>>> >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> >>>>> ... 1 more >>>>> >>>>> >>>>> -- >>>>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io >>>>> >>>> >>>> >>>> >>>> -- >>>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io >>>> >>> >>> >> >> >> -- >> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io >> > > > > -- > Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io > -- Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io