Re: Repeating Records w/ Spark + Avro?

2016-03-12 Thread Chris Miller
Well, I kind of got it... this works below:

*
val rdd = sc.newAPIHadoopFile(path, classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]], classOf[NullWritable]).map(_._1.datum)

rdd
  .map(item => {
val item = i.copy()
val record = i._1.datum()

println(record.get("myValue"))
  })
  .take(10)
*

Seems strange to me that I have to iterate over the RDD effectively two
times -- one to create the RDD, and another to perform my action. It also
seems strange that I can't actually access the data in my RDD until I've
copied the records. I would think this is a *very* common use case of an
RDD -- accessing the data it contains (otherwise, what's the point?).

Is there a way to always enable cloning? There used to be a cloneRecords
parameter on the hadoopfile method, but that seems to have been removed.

Finally, if I add rdd.persist(), then it doesn't work. I guess I would need
to do .map(_._1.datum) again before the map that does the real work.


--
Chris Miller

On Sat, Mar 12, 2016 at 4:15 PM, Chris Miller 
wrote:

> Wow! That sure is buried in the documentation! But yeah, that's what I
> thought more or less.
>
> I tried copying as follows, but that didn't work.
>
> *
> val copyRDD = singleFileRDD.map(_.copy())
> *
>
> When I iterate over the new copyRDD (foreach or map), I still have the
> same problem of duplicate records. I also tried copying within the block
> where I'm using it, but that didn't work either:
>
> *
> rdd
>   .take(10)
>   .collect()
>   .map(item => {
> val item = i.copy()
> val record = i._1.datum()
>
> println(record.get("myValue"))
>   })
> *
>
> What am I doing wrong?
>
> --
> Chris Miller
>
> On Sat, Mar 12, 2016 at 1:48 PM, Peyman Mohajerian 
> wrote:
>
>> Here is the reason for the behavior:
>> '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable
>> object for each record, directly caching the returned RDD or directly
>> passing it to an aggregation or shuffle operation will create many
>> references to the same object. If you plan to directly cache, sort, or
>> aggregate Hadoop writable objects, you should first copy them using a map
>>  function.
>>
>>
>> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html
>>
>> So it is Hadoop related.
>>
>> On Fri, Mar 11, 2016 at 3:19 PM, Chris Miller 
>> wrote:
>>
>>> I have a bit of a strange situation:
>>>
>>> *
>>> import org.apache.avro.generic.{GenericData, GenericRecord}
>>> import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey}
>>> import org.apache.avro.mapreduce.AvroKeyInputFormat
>>> import org.apache.hadoop.io.{NullWritable, WritableUtils}
>>>
>>> val path = "/path/to/data.avro"
>>>
>>> val rdd = sc.newAPIHadoopFile(path,
>>> classOf[AvroKeyInputFormat[GenericRecord]],
>>> classOf[AvroKey[GenericRecord]], classOf[NullWritable])
>>> rdd.take(10).foreach( x => println( x._1.datum() ))
>>> *
>>>
>>> In this situation, I get the right number of records returned, and if I
>>> look at the contents of rdd I see the individual records as tuple2's...
>>> however, if I println on each one as shown above, I get the same result
>>> every time.
>>>
>>> Apparently this has to do with something in Spark or Avro keeping a
>>> reference to the item its iterating over, so I need to clone the object
>>> before I use it. However, if I try to clone it (from the spark-shell
>>> console), I get:
>>>
>>> *
>>> rdd.take(10).foreach( x => {
>>>   val clonedDatum = x._1.datum().clone()
>>>   println(clonedDatum.datum())
>>> })
>>>
>>> :37: error: method clone in class Object cannot be accessed in
>>> org.apache.avro.generic.GenericRecord
>>>  Access to protected method clone not permitted because
>>>  prefix type org.apache.avro.generic.GenericRecord does not conform to
>>>  class $iwC where the access take place
>>> val clonedDatum = x._1.datum().clone()
>>> *
>>>
>>> So, how can I clone the datum?
>>>
>>> Seems I'm not the only one who ran into this problem:
>>> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I
>>> can't figure out how to fix it in my case without hacking away like the
>>> person in the linked PR did.
>>>
>>> Suggestions?
>>>
>>> --
>>> Chris Miller
>>>
>>
>>
>


Re: Repeating Records w/ Spark + Avro?

2016-03-12 Thread Chris Miller
Wow! That sure is buried in the documentation! But yeah, that's what I
thought more or less.

I tried copying as follows, but that didn't work.

*
val copyRDD = singleFileRDD.map(_.copy())
*

When I iterate over the new copyRDD (foreach or map), I still have the same
problem of duplicate records. I also tried copying within the block where
I'm using it, but that didn't work either:

*
rdd
  .take(10)
  .collect()
  .map(item => {
val item = i.copy()
val record = i._1.datum()

println(record.get("myValue"))
  })
*

What am I doing wrong?

--
Chris Miller

On Sat, Mar 12, 2016 at 1:48 PM, Peyman Mohajerian 
wrote:

> Here is the reason for the behavior:
> '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable
> object for each record, directly caching the returned RDD or directly
> passing it to an aggregation or shuffle operation will create many
> references to the same object. If you plan to directly cache, sort, or
> aggregate Hadoop writable objects, you should first copy them using a map
>  function.
>
>
> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html
>
> So it is Hadoop related.
>
> On Fri, Mar 11, 2016 at 3:19 PM, Chris Miller 
> wrote:
>
>> I have a bit of a strange situation:
>>
>> *
>> import org.apache.avro.generic.{GenericData, GenericRecord}
>> import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey}
>> import org.apache.avro.mapreduce.AvroKeyInputFormat
>> import org.apache.hadoop.io.{NullWritable, WritableUtils}
>>
>> val path = "/path/to/data.avro"
>>
>> val rdd = sc.newAPIHadoopFile(path,
>> classOf[AvroKeyInputFormat[GenericRecord]],
>> classOf[AvroKey[GenericRecord]], classOf[NullWritable])
>> rdd.take(10).foreach( x => println( x._1.datum() ))
>> *
>>
>> In this situation, I get the right number of records returned, and if I
>> look at the contents of rdd I see the individual records as tuple2's...
>> however, if I println on each one as shown above, I get the same result
>> every time.
>>
>> Apparently this has to do with something in Spark or Avro keeping a
>> reference to the item its iterating over, so I need to clone the object
>> before I use it. However, if I try to clone it (from the spark-shell
>> console), I get:
>>
>> *
>> rdd.take(10).foreach( x => {
>>   val clonedDatum = x._1.datum().clone()
>>   println(clonedDatum.datum())
>> })
>>
>> :37: error: method clone in class Object cannot be accessed in
>> org.apache.avro.generic.GenericRecord
>>  Access to protected method clone not permitted because
>>  prefix type org.apache.avro.generic.GenericRecord does not conform to
>>  class $iwC where the access take place
>> val clonedDatum = x._1.datum().clone()
>> *
>>
>> So, how can I clone the datum?
>>
>> Seems I'm not the only one who ran into this problem:
>> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I
>> can't figure out how to fix it in my case without hacking away like the
>> person in the linked PR did.
>>
>> Suggestions?
>>
>> --
>> Chris Miller
>>
>
>


Re: Repeating Records w/ Spark + Avro?

2016-03-11 Thread Peyman Mohajerian
Here is the reason for the behavior:
'''Note:''' Because Hadoop's RecordReader class re-uses the same Writable
object for each record, directly caching the returned RDD or directly
passing it to an aggregation or shuffle operation will create many
references to the same object. If you plan to directly cache, sort, or
aggregate Hadoop writable objects, you should first copy them using a map
 function.

https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html

So it is Hadoop related.

On Fri, Mar 11, 2016 at 3:19 PM, Chris Miller 
wrote:

> I have a bit of a strange situation:
>
> *
> import org.apache.avro.generic.{GenericData, GenericRecord}
> import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey}
> import org.apache.avro.mapreduce.AvroKeyInputFormat
> import org.apache.hadoop.io.{NullWritable, WritableUtils}
>
> val path = "/path/to/data.avro"
>
> val rdd = sc.newAPIHadoopFile(path,
> classOf[AvroKeyInputFormat[GenericRecord]],
> classOf[AvroKey[GenericRecord]], classOf[NullWritable])
> rdd.take(10).foreach( x => println( x._1.datum() ))
> *
>
> In this situation, I get the right number of records returned, and if I
> look at the contents of rdd I see the individual records as tuple2's...
> however, if I println on each one as shown above, I get the same result
> every time.
>
> Apparently this has to do with something in Spark or Avro keeping a
> reference to the item its iterating over, so I need to clone the object
> before I use it. However, if I try to clone it (from the spark-shell
> console), I get:
>
> *
> rdd.take(10).foreach( x => {
>   val clonedDatum = x._1.datum().clone()
>   println(clonedDatum.datum())
> })
>
> :37: error: method clone in class Object cannot be accessed in
> org.apache.avro.generic.GenericRecord
>  Access to protected method clone not permitted because
>  prefix type org.apache.avro.generic.GenericRecord does not conform to
>  class $iwC where the access take place
> val clonedDatum = x._1.datum().clone()
> *
>
> So, how can I clone the datum?
>
> Seems I'm not the only one who ran into this problem:
> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I
> can't figure out how to fix it in my case without hacking away like the
> person in the linked PR did.
>
> Suggestions?
>
> --
> Chris Miller
>


Repeating Records w/ Spark + Avro?

2016-03-11 Thread Chris Miller
I have a bit of a strange situation:

*
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey}
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.{NullWritable, WritableUtils}

val path = "/path/to/data.avro"

val rdd = sc.newAPIHadoopFile(path,
classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]], classOf[NullWritable])
rdd.take(10).foreach( x => println( x._1.datum() ))
*

In this situation, I get the right number of records returned, and if I
look at the contents of rdd I see the individual records as tuple2's...
however, if I println on each one as shown above, I get the same result
every time.

Apparently this has to do with something in Spark or Avro keeping a
reference to the item its iterating over, so I need to clone the object
before I use it. However, if I try to clone it (from the spark-shell
console), I get:

*
rdd.take(10).foreach( x => {
  val clonedDatum = x._1.datum().clone()
  println(clonedDatum.datum())
})

:37: error: method clone in class Object cannot be accessed in
org.apache.avro.generic.GenericRecord
 Access to protected method clone not permitted because
 prefix type org.apache.avro.generic.GenericRecord does not conform to
 class $iwC where the access take place
val clonedDatum = x._1.datum().clone()
*

So, how can I clone the datum?

Seems I'm not the only one who ran into this problem:
https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I can't
figure out how to fix it in my case without hacking away like the person in
the linked PR did.

Suggestions?

--
Chris Miller