Hi Robert, The problem is that spark uses java serialization requiring serialized objects to implement Serializable, AvroKey doesn't. As a workaround you can try using kryo<http://spark.incubator.apache.org/docs/latest/tuning.html#data-serialization>for the serialization.
Eugen 2013/11/11 Robert Fink <[email protected]> > Hi, > > I am trying to get the following minimal Scala example work: Using Spark > to process Avro records. Here's my dummy Avro definition: > > { > "namespace": "com.avrotest", > "type": "record", > "name": "AvroTest", > "fields": [ > {"name": "field1", "type": ["string", "null"]} > ] > } > > I experiment with a simple job that creates three AvroTest objects, writes > them out to a file through a SparkContext, and then reads in the thus > generated Avro file and performs a simple grouping operation: > > // > --------------------------------------------------------------------------------------------------------- > import org.apache.spark.SparkContext._ > import org.apache.avro.specific.SpecificDatumWriter > import org.apache.avro.file.DataFileWriter > import org.apache.avro._ > import org.apache.avro.generic._ > import org.apache.hadoop.mapreduce.Job > import com.avrotest.AvroTest > import java.io.File > > object SparkTest{ > def main(args: Array[String]) { > > def avrofile = "output.avro" > def sc = new SparkContext("local", "Simple App") > val job = new Job() > > val record1 = new AvroTest() > record1.setField1("value1") > val record2 = new AvroTest() > record2.setField1("value1") > val record3 = new AvroTest() > record3.setField1("value2") > > def userDatumWriter = new SpecificDatumWriter[AvroTest]() > val dataFileWriter = new DataFileWriter[AvroTest](userDatumWriter) > def file = new File(avrofile) > dataFileWriter.create(record1.getSchema(), file) > dataFileWriter.append(record1) > dataFileWriter.append(record2) > dataFileWriter.append(record3) > dataFileWriter.close() > > def rdd = sc.newAPIHadoopFile( > avrofile, > classOf[org.apache.avro.mapreduce.AvroKeyInputFormat[AvroTest]], > classOf[org.apache.avro.mapred.AvroKey[AvroTest]], > classOf[org.apache.hadoop.io.NullWritable], > job.getConfiguration) > // rdd.foreach( x => println(x._1.datum.getField1) ) // Prints value1, > value1, value2 > val numGroups= rdd.groupBy(x => x._1.datum.getField1).count() > } > } > // > --------------------------------------------------------------------------------------------------------- > > I would expect numGroups==2 in the last step, because record1 and record2 > share the getField1()=="value1", and record3 has getField1() == "value2". > However, the script fails to execute with the following error (see below). > Can anyone give me a hint what could be wrong in the above code, or post an > example of reading from an Avro file and performing some simple > computations on the retrieved objects? Thank you so much! Robert. > > 11650 [pool-109-thread-1] WARN > org.apache.avro.mapreduce.AvroKeyInputFormat - Reader schema was not set. > Use AvroJob.setInputKeySchema() if desired. > 11661 [pool-109-thread-1] INFO > org.apache.avro.mapreduce.AvroKeyInputFormat - Using a reader schema equal > to the writer schema. > 12293 [spark-akka.actor.default-dispatcher-5] INFO > org.apache.spark.scheduler.local.LocalTaskSetManager - Loss was due to > java.io.NotSerializableException > java.io.NotSerializableException: org.apache.avro.mapred.AvroKey > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:27) > at > org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:109) > at > org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152) > at > org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149) > at scala.collection.Iterator$class.foreach(Iterator.scala:772) > at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399) > at > org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149) > at > org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88) > at > org.apache.spark.scheduler.local.LocalScheduler.runTask(LocalScheduler.scala:198) > at > org.apache.spark.scheduler.local.LocalActor$$anonfun$launchTask$1$$anon$1.run(LocalScheduler.scala:68) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > 12304 [spark-akka.actor.default-dispatcher-5] INFO > org.apache.spark.scheduler.local.LocalScheduler - Remove TaskSet 1.0 from > pool > 12311 [run-main] INFO org.apache.spark.scheduler.DAGScheduler - Failed to > run count at SparkTest.scala:41 > [error] (run-main) org.apache.spark.SparkException: Job failed: Task 1.0:0 > failed more than 4 times; aborting job java.io.NotSerializableException: > org.apache.avro.mapred.AvroKey > org.apache.spark.SparkException: Job failed: Task 1.0:0 failed more than 4 > times; aborting job java.io.NotSerializableException: > org.apache.avro.mapred.AvroKey > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758) > at > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379) > at > org.apache.spark.scheduler.DAGScheduler.org<http://org.apache.spark.scheduler.dagscheduler.org/> > $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441) > at > org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149) >
