Hi Josh, crunch-0.10.0-hadoop2
Thanks. From: [email protected] Date: Thu, 21 Aug 2014 13:04:29 -0700 Subject: Re: Trouble with Avro records To: [email protected] That feels like an AvroMode-related exception; which version of Crunch are you using? J On Thu, Aug 21, 2014 at 12:45 PM, Danny Morgan <[email protected]> wrote: Hi Guys, Love crunch but having some trouble recently using Avro records. I think someone needs to write a Crunch book. I'm trying to aggregate hourly visits to a page by each user. I do one pass to parse the records and then I try to "group by" unique users and hour and count the number of times they visited as well as their first visit time in the hour. Here's the Avro schema {"namespace": "com.test", "type": "record", "name": "Visit", "fields": [ {"name": "dateid", "type": "int"}, // Year Month Day Hour in PST as an integer e.g. 2014081103 {"name": "userid", "type": "string"}, {"name": "vcount", "type": ["long", "null"]}, {"name": "firsttimestamp", "type": ["long", "null"]} // Unixtime stamp of first visit ] } Here I do the parsing, at first vcount and firstvisit aren't set. PTable<Visit, Pair<Long, Long>> visits = parsed.parallelDo("visits-parsing", new VisitsExtractor(), Avros.tableOf(Avros.specifics(Visit.class), Avros.pairs(Avros.longs(), Avros.longs()))); The relevant line from VisitsExtractor: emitter.emit(Pair.of(visit, Pair.of(1L, log.timestamp()))); Everything up to this point works fine, now I want to count up the unique visitors and the minimum timestamp. PTable<Visit, Pair<Long, Long>> agg = visits.groupByKey().combineValues(Aggregators.pairAggregator(Aggregators.SUM_LONGS(), Aggregators.MIN_LONGS())); The above seems to work fine too, now I want to create new Visit classes and fill in the count and minimum timestamp fields. PCollection<Visit> count_visits = agg.parallelDo("visits-count", new DoFn<Pair<Visit, Pair<Long, Long>>, Visit>() { @Override public void process(Pair<Visit, Pair<Long, Long>> p, Emitter<Visit> emitter) { Visit v = Visit.newBuilder(p.first()).build(); v.setVcount(p.second().first()); v.setFirsttimestamp(p.second().second()); emitter.emit(v); } }, Avros.specifics(Visit.class)); } count_visits.write(To.textFile(outputPath), WriteMode.OVERWRITE); Here's the error: 2014-08-21 15:09:26,245 ERROR run.CrunchReducer (CrunchReducer.java:reduce(54)) - Reducer exception java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.test.Visit at com.test.Logs$1.process(Logs.java:49) at com.test.Logs$1.process(Logs.java:1) at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) So I'm pretty sure the problem is this line: Visit v = Visit.newBuilder(p.first()).build(); specifically p.first() should be a Visit type but I guess it isn't. I assume the output of the groupBy operation in the reducers is serializing the key but not using the correct Avro type to do it? Also I don't think I understand when I should be using Avros.records() versus Avros.specifics() when I have a generated avro file. Thanks! Danny
