Let me see how much work it would be to get a Parquet patch in; they're usually pretty good about these things. There's no way that a bump to Avro 1.7.7 fixes this, right?
J On Thu, Sep 18, 2014 at 10:44 AM, Danny Morgan <[email protected]> wrote: > Hi Guys, > > Wondering if this is fixable in a future release of the parquet jars? > > Thanks! > > ------------------------------ > From: [email protected] > To: [email protected]; [email protected] > Subject: RE: Trouble with Avro records > Date: Fri, 29 Aug 2014 22:29:23 +0000 > > > Thanks Josh, it does seem to work if I treat the records as generics > instead of a specific class. > > ------------------------------ > From: [email protected] > Date: Fri, 29 Aug 2014 14:49:59 -0700 > Subject: Re: Trouble with Avro records > To: [email protected]; [email protected] > > +tom > > Ack, okay. Dug into this a bit more, and I think the source of the issue > is in Parquet: the code that is looking for the specific class type inside > of Parquet's AvroRecordMaterializer is calling SpecificData.get() instead > of the (new SpecificData(classLoader)) impl that we use inside of AvroMode. > Not sure that we have a way of getting around this w/o copying and > rewriting a lot of the Parquet code, so I added Tom explicitly to see if he > has any ideas. > > J > > > On Fri, Aug 29, 2014 at 2:23 PM, Danny Morgan <[email protected]> > wrote: > > Didn't seem to work. > > I tried > output.inputConf("crunch.avro.mode", "SPECIFIC"); > output.outputConf("crunch.avro.mode", "SPECIFIC"); > > as well to no avail. > > ------------------------------ > From: [email protected] > Date: Fri, 29 Aug 2014 13:52:35 -0700 > > Subject: Re: Trouble with Avro records > To: [email protected] > > Hey Danny, > > So I suspect the problem is that the AvroMode info isn't getting > propagated to the ParquetFileSourceTarget. The simplest way to verify the > problem is not as simple as it should be, but I'd like you to try something > like this: > > SourceTarget<Log> output = new *AvroParquetFileSourceTarget*<Log>(new > Path(path), Avros.specifics(Log.class)); > output.conf("crunch.avro.mode", "SPECIFIC"); > > ...and let me know if that fixes the problem. If so, I can file a JIRA to > fix it properly. > > J > > > On Fri, Aug 29, 2014 at 11:15 AM, Josh Wills <[email protected]> wrote: > > Hey Danny, > > I'll take a look at it later today, kind of a crazy AM for me. > > J > > > On Fri, Aug 29, 2014 at 9:11 AM, Danny Morgan <[email protected]> > wrote: > > Okay looks like the AvroMode.setSpecificClassLoader() fix works for Avro > files but doesn't work for Parquet Avro files. > > // This works great > Pipeline p1 = new MRPipeline(MyLogs.class, getConf()); > PCollections logs = ....; > SourceTarget<Log> output = At.*avroFile*(path, Avros.specifics(Log.class)) > p1.write(logs, output, WriteMode.OVERWRITE); > p1.done(); > > Pipeline p2 = new MRPipeline(MyLogs.class, getConf()); > AvroMode.setSpecificClassLoader(Log.class.getClassLoader()); > PCollection<Log> logs2 = p2.read(out); > p2.done(); > > //This fails with java.lang.ClassCastException: > org.apache.avro.generic.GenericData$Record cannot be cast to com.test.Log > Pipeline p1 = new MRPipeline(MyLogs.class, getConf()); > PCollections logs = ....; > SourceTarget<Log> output = At.avroFile(path, Avros.specifics(Log.class)) > SourceTarget<Log> output = new *AvroParquetFileSourceTarget*<Log>(new > Path(path), Avros.specifics(Log.class)); > p1.write(logs, output, WriteMode.OVERWRITE); > p1.done(); > > Pipeline p2 = new MRPipeline(MyLogs.class, getConf()); > AvroMode.setSpecificClassLoader(Log.class.getClassLoader()); > PCollection<Log> logs2 = p2.read(out); > p2.done(); > > Any idea? > > Thanks! > > > > ------------------------------ > From: [email protected] > Date: Thu, 21 Aug 2014 13:54:56 -0700 > > Subject: Re: Trouble with Avro records > To: [email protected] > > > On Thu, Aug 21, 2014 at 1:46 PM, Danny Morgan <[email protected]> > wrote: > > Thanks Josh, if it is any consolation Crunch has saved me countless hours > and cpu cycles over how we used to do things so I'm glad to be benefiting > from your ill-advised decisions :-) > > > That is always nice to hear-- thanks! > > > > Danny > > ------------------------------ > From: [email protected] > Date: Thu, 21 Aug 2014 13:33:40 -0700 > > Subject: Re: Trouble with Avro records > To: [email protected] > > > On Thu, Aug 21, 2014 at 1:28 PM, Danny Morgan <[email protected]> > wrote: > > Awesome adding: > > AvroMode.setSpecificClassLoader(Visit.class.getClassLoader()); > > Did the trick, thanks Josh! > > Do you mind clarifying when to use Avros.records() versus > Avros.specifics() or Avros.reflects()? > > > Oops, yes-- sorry about that. The records() method is defined on both the > WritableTypeFamily and the AvroTypeFamily as a method that is supposed to > handle "arbitrary" data types that aren't supported by the built-in POJO > and Crunch Tuple methods. Fortunately/unfortunately, the arbitrary data > types depends on the backing implementation: for Writables, records() only > really supports classes that implement Writable, and for Avros, records() > checks to see if the class implements SpecificRecord (at which point it > handles it using Avros.specifics) and if it does not, passes it on to > Avros.reflects. > > I think that, like many thinks in Crunch, records() reflects an early and > somewhat ill-advised decision I made when I was first creating the API that > I would likely do away with if I had it to do over again. :) > > J > > > Thanks Again! > > > ------------------------------ > From: [email protected] > Date: Thu, 21 Aug 2014 13:20:20 -0700 > > Subject: Re: Trouble with Avro records > To: [email protected] > > Okay. I suspect this is the problem: > > https://issues.apache.org/jira/browse/CRUNCH-442 > > Gabriel fixed this for the upcoming 0.11 release, but there are a couple > of workarounds in the comments. One is to put the avro jar into your job > jar file, instead of relying on the one that is in the hadoop lib. The > other is to configure AvroMode.setSpecificClassLoader w/the class loader > for your Visit class before kicking off the job. > > > Josh > > > On Thu, Aug 21, 2014 at 1:10 PM, Danny Morgan <[email protected]> > wrote: > > Hi Josh, > > crunch-0.10.0-hadoop2 > > Thanks. > > ------------------------------ > From: [email protected] > Date: Thu, 21 Aug 2014 13:04:29 -0700 > Subject: Re: Trouble with Avro records > To: [email protected] > > > That feels like an AvroMode-related exception; which version of Crunch are > you using? > > J > > > On Thu, Aug 21, 2014 at 12:45 PM, Danny Morgan <[email protected]> > wrote: > > Hi Guys, > > Love crunch but having some trouble recently using Avro records. I think > someone needs to write a Crunch book. > > I'm trying to aggregate hourly visits to a page by each user. I do one > pass to parse the records and then I try to "group by" unique users and > hour and count the number of times they visited as well as their first > visit time in the hour. Here's the Avro schema > > {"namespace": "com.test", > "type": "record", > "name": "Visit", > "fields": [ > {"name": "dateid", "type": "int"}, // Year Month Day Hour > in PST as an integer e.g. 2014081103 > {"name": "userid", "type": "string"}, > {"name": "vcount", "type": ["long", "null"]}, > {"name": "firsttimestamp", "type": ["long", "null"]} // Unixtime > stamp of first visit > ] > } > > Here I do the parsing, at first vcount and firstvisit aren't set. > > PTable<Visit, Pair<Long, Long>> visits = > parsed.parallelDo("visits-parsing", new VisitsExtractor(), > Avros.tableOf(Avros.specifics(Visit.class), > Avros.pairs(Avros.longs(), Avros.longs()))); > > The relevant line from VisitsExtractor: > emitter.emit(Pair.of(visit, Pair.of(1L, log.timestamp()))); > > Everything up to this point works fine, now I want to count up the unique > visitors and the minimum timestamp. > > PTable<Visit, Pair<Long, Long>> agg = > visits.groupByKey().combineValues(Aggregators.pairAggregator(Aggregators.SUM_LONGS(), > Aggregators.MIN_LONGS())); > > The above seems to work fine too, now I want to create new Visit classes > and fill in the count and minimum timestamp fields. > > PCollection<Visit> count_visits = agg.parallelDo("visits-count", > new DoFn<Pair<Visit, Pair<Long, Long>>, Visit>() { > @Override > public void process(Pair<Visit, Pair<Long, Long>> p, > Emitter<Visit> emitter) { > Visit v = Visit.newBuilder(p.first()).build(); > v.setVcount(p.second().first()); > v.setFirsttimestamp(p.second().second()); > emitter.emit(v); > } > }, Avros.specifics(Visit.class)); > } > > count_visits.write(To.textFile(outputPath), WriteMode.OVERWRITE); > > Here's the error: > 2014-08-21 15:09:26,245 ERROR run.CrunchReducer > (CrunchReducer.java:reduce(54)) - Reducer exception > java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record > cannot be cast to com.test.Visit > at com.test.Logs$1.process(Logs.java:49) > at com.test.Logs$1.process(Logs.java:1) > at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) > at > org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) > > So I'm pretty sure the problem is this line: > > Visit v = Visit.newBuilder(p.first()).build(); > > specifically p.first() should be a Visit type but I guess it isn't. I > assume the output of the groupBy operation in the reducers is serializing > the key but not using the correct Avro type to do it? > > Also I don't think I understand when I should be using Avros.records() > versus Avros.specifics() when I have a generated avro file. > > Thanks! > > Danny > > > > > > > > > -- > Director of Data Science > Cloudera <http://www.cloudera.com> > Twitter: @josh_wills <http://twitter.com/josh_wills> > > > > > -- > Director of Data Science > Cloudera <http://www.cloudera.com> > Twitter: @josh_wills <http://twitter.com/josh_wills> > > > > > -- > Director of Data Science > Cloudera <http://www.cloudera.com> > Twitter: @josh_wills <http://twitter.com/josh_wills> > -- Director of Data Science Cloudera <http://www.cloudera.com> Twitter: @josh_wills <http://twitter.com/josh_wills>
