On Thu, Aug 21, 2014 at 1:46 PM, Danny Morgan <[email protected]> wrote:
> Thanks Josh, if it is any consolation Crunch has saved me countless hours > and cpu cycles over how we used to do things so I'm glad to be benefiting > from your ill-advised decisions :-) > That is always nice to hear-- thanks! > > Danny > > ------------------------------ > From: [email protected] > Date: Thu, 21 Aug 2014 13:33:40 -0700 > > Subject: Re: Trouble with Avro records > To: [email protected] > > > On Thu, Aug 21, 2014 at 1:28 PM, Danny Morgan <[email protected]> > wrote: > > Awesome adding: > > AvroMode.setSpecificClassLoader(Visit.class.getClassLoader()); > > Did the trick, thanks Josh! > > Do you mind clarifying when to use Avros.records() versus > Avros.specifics() or Avros.reflects()? > > > Oops, yes-- sorry about that. The records() method is defined on both the > WritableTypeFamily and the AvroTypeFamily as a method that is supposed to > handle "arbitrary" data types that aren't supported by the built-in POJO > and Crunch Tuple methods. Fortunately/unfortunately, the arbitrary data > types depends on the backing implementation: for Writables, records() only > really supports classes that implement Writable, and for Avros, records() > checks to see if the class implements SpecificRecord (at which point it > handles it using Avros.specifics) and if it does not, passes it on to > Avros.reflects. > > I think that, like many thinks in Crunch, records() reflects an early and > somewhat ill-advised decision I made when I was first creating the API that > I would likely do away with if I had it to do over again. :) > > J > > > Thanks Again! > > > ------------------------------ > From: [email protected] > Date: Thu, 21 Aug 2014 13:20:20 -0700 > > Subject: Re: Trouble with Avro records > To: [email protected] > > Okay. I suspect this is the problem: > > https://issues.apache.org/jira/browse/CRUNCH-442 > > Gabriel fixed this for the upcoming 0.11 release, but there are a couple > of workarounds in the comments. One is to put the avro jar into your job > jar file, instead of relying on the one that is in the hadoop lib. The > other is to configure AvroMode.setSpecificClassLoader w/the class loader > for your Visit class before kicking off the job. > > > Josh > > > On Thu, Aug 21, 2014 at 1:10 PM, Danny Morgan <[email protected]> > wrote: > > Hi Josh, > > crunch-0.10.0-hadoop2 > > Thanks. > > ------------------------------ > From: [email protected] > Date: Thu, 21 Aug 2014 13:04:29 -0700 > Subject: Re: Trouble with Avro records > To: [email protected] > > > That feels like an AvroMode-related exception; which version of Crunch are > you using? > > J > > > On Thu, Aug 21, 2014 at 12:45 PM, Danny Morgan <[email protected]> > wrote: > > Hi Guys, > > Love crunch but having some trouble recently using Avro records. I think > someone needs to write a Crunch book. > > I'm trying to aggregate hourly visits to a page by each user. I do one > pass to parse the records and then I try to "group by" unique users and > hour and count the number of times they visited as well as their first > visit time in the hour. Here's the Avro schema > > {"namespace": "com.test", > "type": "record", > "name": "Visit", > "fields": [ > {"name": "dateid", "type": "int"}, // Year Month Day Hour > in PST as an integer e.g. 2014081103 > {"name": "userid", "type": "string"}, > {"name": "vcount", "type": ["long", "null"]}, > {"name": "firsttimestamp", "type": ["long", "null"]} // Unixtime > stamp of first visit > ] > } > > Here I do the parsing, at first vcount and firstvisit aren't set. > > PTable<Visit, Pair<Long, Long>> visits = > parsed.parallelDo("visits-parsing", new VisitsExtractor(), > Avros.tableOf(Avros.specifics(Visit.class), > Avros.pairs(Avros.longs(), Avros.longs()))); > > The relevant line from VisitsExtractor: > emitter.emit(Pair.of(visit, Pair.of(1L, log.timestamp()))); > > Everything up to this point works fine, now I want to count up the unique > visitors and the minimum timestamp. > > PTable<Visit, Pair<Long, Long>> agg = > visits.groupByKey().combineValues(Aggregators.pairAggregator(Aggregators.SUM_LONGS(), > Aggregators.MIN_LONGS())); > > The above seems to work fine too, now I want to create new Visit classes > and fill in the count and minimum timestamp fields. > > PCollection<Visit> count_visits = agg.parallelDo("visits-count", > new DoFn<Pair<Visit, Pair<Long, Long>>, Visit>() { > @Override > public void process(Pair<Visit, Pair<Long, Long>> p, > Emitter<Visit> emitter) { > Visit v = Visit.newBuilder(p.first()).build(); > v.setVcount(p.second().first()); > v.setFirsttimestamp(p.second().second()); > emitter.emit(v); > } > }, Avros.specifics(Visit.class)); > } > > count_visits.write(To.textFile(outputPath), WriteMode.OVERWRITE); > > Here's the error: > 2014-08-21 15:09:26,245 ERROR run.CrunchReducer > (CrunchReducer.java:reduce(54)) - Reducer exception > java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record > cannot be cast to com.test.Visit > at com.test.Logs$1.process(Logs.java:49) > at com.test.Logs$1.process(Logs.java:1) > at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) > at > org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) > > So I'm pretty sure the problem is this line: > > Visit v = Visit.newBuilder(p.first()).build(); > > specifically p.first() should be a Visit type but I guess it isn't. I > assume the output of the groupBy operation in the reducers is serializing > the key but not using the correct Avro type to do it? > > Also I don't think I understand when I should be using Avros.records() > versus Avros.specifics() when I have a generated avro file. > > Thanks! > > Danny > > > > >
