Well, I kind of got it... this works below:
*****************
val rdd = sc.newAPIHadoopFile(path, classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]], classOf[NullWritable]).map(_._1.datum)
rdd
.map(item => {
val item = i.copy()
val record = i._1.datum()
println(record.get("myValue"))
})
.take(10)
*****************
Seems strange to me that I have to iterate over the RDD effectively two
times -- one to create the RDD, and another to perform my action. It also
seems strange that I can't actually access the data in my RDD until I've
copied the records. I would think this is a *very* common use case of an
RDD -- accessing the data it contains (otherwise, what's the point?).
Is there a way to always enable cloning? There used to be a cloneRecords
parameter on the hadoopfile method, but that seems to have been removed.
Finally, if I add rdd.persist(), then it doesn't work. I guess I would need
to do .map(_._1.datum) again before the map that does the real work.
--
Chris Miller
On Sat, Mar 12, 2016 at 4:15 PM, Chris Miller <[email protected]>
wrote:
> Wow! That sure is buried in the documentation! But yeah, that's what I
> thought more or less.
>
> I tried copying as follows, but that didn't work.
>
> *****************
> val copyRDD = singleFileRDD.map(_.copy())
> *****************
>
> When I iterate over the new copyRDD (foreach or map), I still have the
> same problem of duplicate records. I also tried copying within the block
> where I'm using it, but that didn't work either:
>
> *****************
> rdd
> .take(10)
> .collect()
> .map(item => {
> val item = i.copy()
> val record = i._1.datum()
>
> println(record.get("myValue"))
> })
> *****************
>
> What am I doing wrong?
>
> --
> Chris Miller
>
> On Sat, Mar 12, 2016 at 1:48 PM, Peyman Mohajerian <[email protected]>
> wrote:
>
>> Here is the reason for the behavior:
>> '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable
>> object for each record, directly caching the returned RDD or directly
>> passing it to an aggregation or shuffle operation will create many
>> references to the same object. If you plan to directly cache, sort, or
>> aggregate Hadoop writable objects, you should first copy them using a map
>> function.
>>
>>
>> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html
>>
>> So it is Hadoop related.
>>
>> On Fri, Mar 11, 2016 at 3:19 PM, Chris Miller <[email protected]>
>> wrote:
>>
>>> I have a bit of a strange situation:
>>>
>>> *****************
>>> import org.apache.avro.generic.{GenericData, GenericRecord}
>>> import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey}
>>> import org.apache.avro.mapreduce.AvroKeyInputFormat
>>> import org.apache.hadoop.io.{NullWritable, WritableUtils}
>>>
>>> val path = "/path/to/data.avro"
>>>
>>> val rdd = sc.newAPIHadoopFile(path,
>>> classOf[AvroKeyInputFormat[GenericRecord]],
>>> classOf[AvroKey[GenericRecord]], classOf[NullWritable])
>>> rdd.take(10).foreach( x => println( x._1.datum() ))
>>> *****************
>>>
>>> In this situation, I get the right number of records returned, and if I
>>> look at the contents of rdd I see the individual records as tuple2's...
>>> however, if I println on each one as shown above, I get the same result
>>> every time.
>>>
>>> Apparently this has to do with something in Spark or Avro keeping a
>>> reference to the item its iterating over, so I need to clone the object
>>> before I use it. However, if I try to clone it (from the spark-shell
>>> console), I get:
>>>
>>> *****************
>>> rdd.take(10).foreach( x => {
>>> val clonedDatum = x._1.datum().clone()
>>> println(clonedDatum.datum())
>>> })
>>>
>>> <console>:37: error: method clone in class Object cannot be accessed in
>>> org.apache.avro.generic.GenericRecord
>>> Access to protected method clone not permitted because
>>> prefix type org.apache.avro.generic.GenericRecord does not conform to
>>> class $iwC where the access take place
>>> val clonedDatum = x._1.datum().clone()
>>> *****************
>>>
>>> So, how can I clone the datum?
>>>
>>> Seems I'm not the only one who ran into this problem:
>>> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I
>>> can't figure out how to fix it in my case without hacking away like the
>>> person in the linked PR did.
>>>
>>> Suggestions?
>>>
>>> --
>>> Chris Miller
>>>
>>
>>
>