I have also been struggling with reading avro. Very glad to hear that there is a new avro library coming in Spark 1.2 (which by the way, seems to have a lot of other very useful improvements).
In the meanwhile, I have been able to piece together several snippets/tips that I found from various sources and I am now able to read/write avro correctly. From my understanding, you basically need 3 pieces: 1. Use the kryo serializer. 2. Register your avro classes. I have done this using twitter chill 0.4.0. 3. Read/write avro with a snippet of code like the one you posted. Here is relevant code (hopefully all of it). // All of the following are needed in order to read/write AVRO files import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.io.NullWritable import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.fs.{ FileSystem, Path } // Uncomment the following line if you want to use generic AVRO, I am using specific //import org.apache.avro.generic.GenericData import org.apache.avro.Schema import org.apache.avro.mapreduce.{ AvroJob, AvroKeyInputFormat, AvroKeyOutputFormat } import org.apache.avro.mapred.AvroKey // Kryo/avro serialization stuff import com.esotericsoftware.kryo.Kryo import com.twitter.chill.avro.AvroSerializer import org.apache.spark.serializer.{ KryoSerializer, KryoRegistrator } object MyApp { def main(args: Array[String]) { val conf = new SparkConf().setAppName("MyApp").setMaster("local[*]") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrator", "MyRegistrator") } // Read val readJob = new Job() AvroJob.setInputKeySchema(readJob, schema) sc.newAPIHadoopFile(inputPath, classOf[AvroKeyInputFormat[MyAvroClass]], classOf[AvroKey[MyAvroClass]], classOf[NullWritable], readJob.getConfiguration) .map { t => t._1.datum } // Write val rddAvroWritable = rdd.map { s => (new AvroKey(s), NullWritable.get) } val writeJob = new Job() AvroJob.setOutputKeySchema(writeJob, schema) writeJob.setOutputFormatClass(classOf[AvroKeyOutputFormat[MyAvroClass]]) rddAvroWritable.saveAsNewAPIHadoopFile(outputPath, classOf[AvroKey[MyAvroClass]], classOf[NullWritable], classOf[AvroKeyOutputFormat[MyAvroClass]], writeJob.getConfiguration) } } class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { // Put a line like the following for each of your Avro classes if you use specific Avro // If you use generic Avro, chill also has a function for that: GenericRecordSerializer kryo.register(classOf[MyAvroClass], AvroSerializer.SpecificRecordBinarySerializer[MyAvroClass]) } } Simone Franzini, PhD http://www.linkedin.com/in/simonefranzini On Fri, Nov 21, 2014 at 7:04 AM, thomas j <beanb...@googlemail.com> wrote: > I've been able to load a different avro file based on GenericRecord with: > > val person = sqlContext.avroFile("/tmp/person.avro") > > When I try to call `first()` on it, I get "NotSerializableException" > exceptions again: > > person.first() > > ... > 14/11/21 12:59:17 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID > 20) > java.io.NotSerializableException: > org.apache.avro.generic.GenericData$Record > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > ... > > Apart from this I want to transform the records into pairs of (user_id, > record). I can do this by specifying the offset of the user_id column with > something like this: > > person.map(r => (r.getInt(2), r)).take(4).collect() > > Is there any way to be able to specify the column name ("user_id") instead > of needing to know/calculate the offset somehow? > > Thanks again > > > On Fri, Nov 21, 2014 at 11:48 AM, thomas j <beanb...@googlemail.com> > wrote: > >> Thanks for the pointer Michael. >> >> I've downloaded spark 1.2.0 from >> https://people.apache.org/~pwendell/spark-1.2.0-snapshot1/ and clone and >> built the spark-avro repo you linked to. >> >> When I run it against the example avro file linked to in the >> documentation it works. However, when I try to load my avro file (linked to >> in my original question) I receive the following error: >> >> java.lang.RuntimeException: Unsupported type LONG >> at scala.sys.package$.error(package.scala:27) >> at com.databricks.spark.avro.AvroRelation.com >> $databricks$spark$avro$AvroRelation$$toSqlType(AvroRelation.scala:116) >> at >> com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:97) >> at >> com.databricks.spark.avro.AvroRelation$$anonfun$5.apply(AvroRelation.scala:96) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> ... >> >> If this is useful I'm happy to try loading the various different avro >> files I have to try to battle-test spark-avro. >> >> Thanks >> >> On Thu, Nov 20, 2014 at 6:30 PM, Michael Armbrust <mich...@databricks.com >> > wrote: >> >>> One option (starting with Spark 1.2, which is currently in preview) is >>> to use the Avro library for Spark SQL. This is very new, but we would love >>> to get feedback: https://github.com/databricks/spark-avro >>> >>> On Thu, Nov 20, 2014 at 10:19 AM, al b <beanb...@googlemail.com> wrote: >>> >>>> I've read several posts of people struggling to read avro in spark. The >>>> examples I've tried don't work. When I try this solution ( >>>> https://stackoverflow.com/questions/23944615/how-can-i-load-avros-in-spark-using-the-schema-on-board-the-avro-files) >>>> I get errors: >>>> >>>> spark java.io.NotSerializableException: >>>> org.apache.avro.mapred.AvroWrapper >>>> >>>> How can I read the following sample file in spark using scala? >>>> >>>> http://www.4shared.com/file/SxnYcdgJce/sample.html >>>> >>>> Thomas >>>> >>> >>> >> >