Re: Repeating Records w/ Spark + Avro?
Well, I kind of got it... this works below: * val rdd = sc.newAPIHadoopFile(path, classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroKey[GenericRecord]], classOf[NullWritable]).map(_._1.datum) rdd .map(item => { val item = i.copy() val record = i._1.datum() println(record.get("myValue")) }) .take(10) * Seems strange to me that I have to iterate over the RDD effectively two times -- one to create the RDD, and another to perform my action. It also seems strange that I can't actually access the data in my RDD until I've copied the records. I would think this is a *very* common use case of an RDD -- accessing the data it contains (otherwise, what's the point?). Is there a way to always enable cloning? There used to be a cloneRecords parameter on the hadoopfile method, but that seems to have been removed. Finally, if I add rdd.persist(), then it doesn't work. I guess I would need to do .map(_._1.datum) again before the map that does the real work. -- Chris Miller On Sat, Mar 12, 2016 at 4:15 PM, Chris Miller wrote: > Wow! That sure is buried in the documentation! But yeah, that's what I > thought more or less. > > I tried copying as follows, but that didn't work. > > * > val copyRDD = singleFileRDD.map(_.copy()) > * > > When I iterate over the new copyRDD (foreach or map), I still have the > same problem of duplicate records. I also tried copying within the block > where I'm using it, but that didn't work either: > > * > rdd > .take(10) > .collect() > .map(item => { > val item = i.copy() > val record = i._1.datum() > > println(record.get("myValue")) > }) > * > > What am I doing wrong? > > -- > Chris Miller > > On Sat, Mar 12, 2016 at 1:48 PM, Peyman Mohajerian > wrote: > >> Here is the reason for the behavior: >> '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable >> object for each record, directly caching the returned RDD or directly >> passing it to an aggregation or shuffle operation will create many >> references to the same object. If you plan to directly cache, sort, or >> aggregate Hadoop writable objects, you should first copy them using a map >> function. >> >> >> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html >> >> So it is Hadoop related. >> >> On Fri, Mar 11, 2016 at 3:19 PM, Chris Miller >> wrote: >> >>> I have a bit of a strange situation: >>> >>> * >>> import org.apache.avro.generic.{GenericData, GenericRecord} >>> import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey} >>> import org.apache.avro.mapreduce.AvroKeyInputFormat >>> import org.apache.hadoop.io.{NullWritable, WritableUtils} >>> >>> val path = "/path/to/data.avro" >>> >>> val rdd = sc.newAPIHadoopFile(path, >>> classOf[AvroKeyInputFormat[GenericRecord]], >>> classOf[AvroKey[GenericRecord]], classOf[NullWritable]) >>> rdd.take(10).foreach( x => println( x._1.datum() )) >>> * >>> >>> In this situation, I get the right number of records returned, and if I >>> look at the contents of rdd I see the individual records as tuple2's... >>> however, if I println on each one as shown above, I get the same result >>> every time. >>> >>> Apparently this has to do with something in Spark or Avro keeping a >>> reference to the item its iterating over, so I need to clone the object >>> before I use it. However, if I try to clone it (from the spark-shell >>> console), I get: >>> >>> * >>> rdd.take(10).foreach( x => { >>> val clonedDatum = x._1.datum().clone() >>> println(clonedDatum.datum()) >>> }) >>> >>> :37: error: method clone in class Object cannot be accessed in >>> org.apache.avro.generic.GenericRecord >>> Access to protected method clone not permitted because >>> prefix type org.apache.avro.generic.GenericRecord does not conform to >>> class $iwC where the access take place >>> val clonedDatum = x._1.datum().clone() >>> * >>> >>> So, how can I clone the datum? >>> >>> Seems I'm not the only one who ran into this problem: >>> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I >>> can't figure out how to fix it in my case without hacking away like the >>> person in the linked PR did. >>> >>> Suggestions? >>> >>> -- >>> Chris Miller >>> >> >> >
Re: Repeating Records w/ Spark + Avro?
Wow! That sure is buried in the documentation! But yeah, that's what I thought more or less. I tried copying as follows, but that didn't work. * val copyRDD = singleFileRDD.map(_.copy()) * When I iterate over the new copyRDD (foreach or map), I still have the same problem of duplicate records. I also tried copying within the block where I'm using it, but that didn't work either: * rdd .take(10) .collect() .map(item => { val item = i.copy() val record = i._1.datum() println(record.get("myValue")) }) * What am I doing wrong? -- Chris Miller On Sat, Mar 12, 2016 at 1:48 PM, Peyman Mohajerian wrote: > Here is the reason for the behavior: > '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable > object for each record, directly caching the returned RDD or directly > passing it to an aggregation or shuffle operation will create many > references to the same object. If you plan to directly cache, sort, or > aggregate Hadoop writable objects, you should first copy them using a map > function. > > > https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html > > So it is Hadoop related. > > On Fri, Mar 11, 2016 at 3:19 PM, Chris Miller > wrote: > >> I have a bit of a strange situation: >> >> * >> import org.apache.avro.generic.{GenericData, GenericRecord} >> import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey} >> import org.apache.avro.mapreduce.AvroKeyInputFormat >> import org.apache.hadoop.io.{NullWritable, WritableUtils} >> >> val path = "/path/to/data.avro" >> >> val rdd = sc.newAPIHadoopFile(path, >> classOf[AvroKeyInputFormat[GenericRecord]], >> classOf[AvroKey[GenericRecord]], classOf[NullWritable]) >> rdd.take(10).foreach( x => println( x._1.datum() )) >> * >> >> In this situation, I get the right number of records returned, and if I >> look at the contents of rdd I see the individual records as tuple2's... >> however, if I println on each one as shown above, I get the same result >> every time. >> >> Apparently this has to do with something in Spark or Avro keeping a >> reference to the item its iterating over, so I need to clone the object >> before I use it. However, if I try to clone it (from the spark-shell >> console), I get: >> >> * >> rdd.take(10).foreach( x => { >> val clonedDatum = x._1.datum().clone() >> println(clonedDatum.datum()) >> }) >> >> :37: error: method clone in class Object cannot be accessed in >> org.apache.avro.generic.GenericRecord >> Access to protected method clone not permitted because >> prefix type org.apache.avro.generic.GenericRecord does not conform to >> class $iwC where the access take place >> val clonedDatum = x._1.datum().clone() >> * >> >> So, how can I clone the datum? >> >> Seems I'm not the only one who ran into this problem: >> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I >> can't figure out how to fix it in my case without hacking away like the >> person in the linked PR did. >> >> Suggestions? >> >> -- >> Chris Miller >> > >
Re: Repeating Records w/ Spark + Avro?
Here is the reason for the behavior: '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each record, directly caching the returned RDD or directly passing it to an aggregation or shuffle operation will create many references to the same object. If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first copy them using a map function. https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html So it is Hadoop related. On Fri, Mar 11, 2016 at 3:19 PM, Chris Miller wrote: > I have a bit of a strange situation: > > * > import org.apache.avro.generic.{GenericData, GenericRecord} > import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey} > import org.apache.avro.mapreduce.AvroKeyInputFormat > import org.apache.hadoop.io.{NullWritable, WritableUtils} > > val path = "/path/to/data.avro" > > val rdd = sc.newAPIHadoopFile(path, > classOf[AvroKeyInputFormat[GenericRecord]], > classOf[AvroKey[GenericRecord]], classOf[NullWritable]) > rdd.take(10).foreach( x => println( x._1.datum() )) > * > > In this situation, I get the right number of records returned, and if I > look at the contents of rdd I see the individual records as tuple2's... > however, if I println on each one as shown above, I get the same result > every time. > > Apparently this has to do with something in Spark or Avro keeping a > reference to the item its iterating over, so I need to clone the object > before I use it. However, if I try to clone it (from the spark-shell > console), I get: > > * > rdd.take(10).foreach( x => { > val clonedDatum = x._1.datum().clone() > println(clonedDatum.datum()) > }) > > :37: error: method clone in class Object cannot be accessed in > org.apache.avro.generic.GenericRecord > Access to protected method clone not permitted because > prefix type org.apache.avro.generic.GenericRecord does not conform to > class $iwC where the access take place > val clonedDatum = x._1.datum().clone() > * > > So, how can I clone the datum? > > Seems I'm not the only one who ran into this problem: > https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I > can't figure out how to fix it in my case without hacking away like the > person in the linked PR did. > > Suggestions? > > -- > Chris Miller >
Repeating Records w/ Spark + Avro?
I have a bit of a strange situation: * import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey} import org.apache.avro.mapreduce.AvroKeyInputFormat import org.apache.hadoop.io.{NullWritable, WritableUtils} val path = "/path/to/data.avro" val rdd = sc.newAPIHadoopFile(path, classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroKey[GenericRecord]], classOf[NullWritable]) rdd.take(10).foreach( x => println( x._1.datum() )) * In this situation, I get the right number of records returned, and if I look at the contents of rdd I see the individual records as tuple2's... however, if I println on each one as shown above, I get the same result every time. Apparently this has to do with something in Spark or Avro keeping a reference to the item its iterating over, so I need to clone the object before I use it. However, if I try to clone it (from the spark-shell console), I get: * rdd.take(10).foreach( x => { val clonedDatum = x._1.datum().clone() println(clonedDatum.datum()) }) :37: error: method clone in class Object cannot be accessed in org.apache.avro.generic.GenericRecord Access to protected method clone not permitted because prefix type org.apache.avro.generic.GenericRecord does not conform to class $iwC where the access take place val clonedDatum = x._1.datum().clone() * So, how can I clone the datum? Seems I'm not the only one who ran into this problem: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I can't figure out how to fix it in my case without hacking away like the person in the linked PR did. Suggestions? -- Chris Miller