Hi Robert,

The problem is that spark uses java serialization requiring serialized
objects to implement Serializable, AvroKey doesn't.
As a workaround you can try using
kryo<http://spark.incubator.apache.org/docs/latest/tuning.html#data-serialization>for
the serialization.

Eugen


2013/11/11 Robert Fink <[email protected]>

> Hi,
>
> I am trying to get the following minimal Scala example work: Using Spark
> to process Avro records. Here's my dummy Avro definition:
>
> {
>   "namespace": "com.avrotest",
>   "type": "record",
>   "name": "AvroTest",
>   "fields": [
>     {"name": "field1", "type": ["string", "null"]}
>   ]
> }
>
> I experiment with a simple job that creates three AvroTest objects, writes
> them out to a file through a SparkContext, and then reads in the thus
> generated Avro file and performs a simple grouping operation:
>
> //
> ---------------------------------------------------------------------------------------------------------
> import org.apache.spark.SparkContext._
> import org.apache.avro.specific.SpecificDatumWriter
> import org.apache.avro.file.DataFileWriter
> import org.apache.avro._
> import org.apache.avro.generic._
> import org.apache.hadoop.mapreduce.Job
> import com.avrotest.AvroTest
> import java.io.File
>
> object SparkTest{
>   def main(args: Array[String]) {
>
>     def avrofile = "output.avro"
>     def sc = new SparkContext("local", "Simple App")
>     val job = new Job()
>
>     val record1 = new AvroTest()
>     record1.setField1("value1")
>     val record2 = new AvroTest()
>     record2.setField1("value1")
>     val record3 = new AvroTest()
>     record3.setField1("value2")
>
>     def userDatumWriter = new SpecificDatumWriter[AvroTest]()
>     val dataFileWriter = new DataFileWriter[AvroTest](userDatumWriter)
>     def file = new File(avrofile)
>     dataFileWriter.create(record1.getSchema(), file)
>     dataFileWriter.append(record1)
>     dataFileWriter.append(record2)
>     dataFileWriter.append(record3)
>     dataFileWriter.close()
>
>     def rdd = sc.newAPIHadoopFile(
>       avrofile,
>       classOf[org.apache.avro.mapreduce.AvroKeyInputFormat[AvroTest]],
>       classOf[org.apache.avro.mapred.AvroKey[AvroTest]],
>       classOf[org.apache.hadoop.io.NullWritable],
>       job.getConfiguration)
>     // rdd.foreach( x => println(x._1.datum.getField1) ) // Prints value1,
> value1, value2
>     val numGroups= rdd.groupBy(x => x._1.datum.getField1).count()
>   }
> }
> //
> ---------------------------------------------------------------------------------------------------------
>
> I would expect numGroups==2 in the last step, because record1 and record2
> share the getField1()=="value1", and record3 has getField1() == "value2".
> However, the script fails to execute with the following error (see below).
> Can anyone give me a hint what could be wrong in the above code, or post an
> example of reading from an Avro file and performing some simple
> computations on the retrieved objects? Thank you so much! Robert.
>
> 11650 [pool-109-thread-1] WARN
> org.apache.avro.mapreduce.AvroKeyInputFormat - Reader schema was not set.
> Use AvroJob.setInputKeySchema() if desired.
> 11661 [pool-109-thread-1] INFO
> org.apache.avro.mapreduce.AvroKeyInputFormat - Using a reader schema equal
> to the writer schema.
> 12293 [spark-akka.actor.default-dispatcher-5] INFO
> org.apache.spark.scheduler.local.LocalTaskSetManager - Loss was due to
> java.io.NotSerializableException
> java.io.NotSerializableException: org.apache.avro.mapred.AvroKey
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>         at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>         at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:27)
>         at
> org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:109)
>         at
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
>         at
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>         at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
>         at
> org.apache.spark.scheduler.local.LocalScheduler.runTask(LocalScheduler.scala:198)
>         at
> org.apache.spark.scheduler.local.LocalActor$$anonfun$launchTask$1$$anon$1.run(LocalScheduler.scala:68)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:744)
> 12304 [spark-akka.actor.default-dispatcher-5] INFO
> org.apache.spark.scheduler.local.LocalScheduler - Remove TaskSet 1.0 from
> pool
> 12311 [run-main] INFO org.apache.spark.scheduler.DAGScheduler - Failed to
> run count at SparkTest.scala:41
> [error] (run-main) org.apache.spark.SparkException: Job failed: Task 1.0:0
> failed more than 4 times; aborting job java.io.NotSerializableException:
> org.apache.avro.mapred.AvroKey
> org.apache.spark.SparkException: Job failed: Task 1.0:0 failed more than 4
> times; aborting job java.io.NotSerializableException:
> org.apache.avro.mapred.AvroKey
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>         at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>         at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
>         at 
> org.apache.spark.scheduler.DAGScheduler.org<http://org.apache.spark.scheduler.dagscheduler.org/>
> $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>

Reply via email to