Wow! That sure is buried in the documentation! But yeah, that's what I thought more or less.
I tried copying as follows, but that didn't work. ***************** val copyRDD = singleFileRDD.map(_.copy()) ***************** When I iterate over the new copyRDD (foreach or map), I still have the same problem of duplicate records. I also tried copying within the block where I'm using it, but that didn't work either: ***************** rdd .take(10) .collect() .map(item => { val item = i.copy() val record = i._1.datum() println(record.get("myValue")) }) ***************** What am I doing wrong? -- Chris Miller On Sat, Mar 12, 2016 at 1:48 PM, Peyman Mohajerian <mohaj...@gmail.com> wrote: > Here is the reason for the behavior: > '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable > object for each record, directly caching the returned RDD or directly > passing it to an aggregation or shuffle operation will create many > references to the same object. If you plan to directly cache, sort, or > aggregate Hadoop writable objects, you should first copy them using a map > function. > > > https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html > > So it is Hadoop related. > > On Fri, Mar 11, 2016 at 3:19 PM, Chris Miller <cmiller11...@gmail.com> > wrote: > >> I have a bit of a strange situation: >> >> ***************** >> import org.apache.avro.generic.{GenericData, GenericRecord} >> import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey} >> import org.apache.avro.mapreduce.AvroKeyInputFormat >> import org.apache.hadoop.io.{NullWritable, WritableUtils} >> >> val path = "/path/to/data.avro" >> >> val rdd = sc.newAPIHadoopFile(path, >> classOf[AvroKeyInputFormat[GenericRecord]], >> classOf[AvroKey[GenericRecord]], classOf[NullWritable]) >> rdd.take(10).foreach( x => println( x._1.datum() )) >> ***************** >> >> In this situation, I get the right number of records returned, and if I >> look at the contents of rdd I see the individual records as tuple2's... >> however, if I println on each one as shown above, I get the same result >> every time. >> >> Apparently this has to do with something in Spark or Avro keeping a >> reference to the item its iterating over, so I need to clone the object >> before I use it. However, if I try to clone it (from the spark-shell >> console), I get: >> >> ***************** >> rdd.take(10).foreach( x => { >> val clonedDatum = x._1.datum().clone() >> println(clonedDatum.datum()) >> }) >> >> <console>:37: error: method clone in class Object cannot be accessed in >> org.apache.avro.generic.GenericRecord >> Access to protected method clone not permitted because >> prefix type org.apache.avro.generic.GenericRecord does not conform to >> class $iwC where the access take place >> val clonedDatum = x._1.datum().clone() >> ***************** >> >> So, how can I clone the datum? >> >> Seems I'm not the only one who ran into this problem: >> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I >> can't figure out how to fix it in my case without hacking away like the >> person in the linked PR did. >> >> Suggestions? >> >> -- >> Chris Miller >> > >