Hi Guys,
Love crunch but having some trouble recently using Avro records. I think
someone needs to write a Crunch book.
I'm trying to aggregate hourly visits to a page by each user. I do one pass to
parse the records and then I try to "group by" unique users and hour and count
the number of times they visited as well as their first visit time in the hour.
Here's the Avro schema
{"namespace": "com.test",
"type": "record",
"name": "Visit",
"fields": [
{"name": "dateid", "type": "int"}, // Year Month Day Hour in
PST as an integer e.g. 2014081103
{"name": "userid", "type": "string"},
{"name": "vcount", "type": ["long", "null"]},
{"name": "firsttimestamp", "type": ["long", "null"]} // Unixtime stamp of
first visit
]
}
Here I do the parsing, at first vcount and firstvisit aren't set.
PTable<Visit, Pair<Long, Long>> visits = parsed.parallelDo("visits-parsing",
new VisitsExtractor(),
Avros.tableOf(Avros.specifics(Visit.class),
Avros.pairs(Avros.longs(), Avros.longs())));
The relevant line from VisitsExtractor:
emitter.emit(Pair.of(visit, Pair.of(1L, log.timestamp())));
Everything up to this point works fine, now I want to count up the unique
visitors and the minimum timestamp.
PTable<Visit, Pair<Long, Long>> agg =
visits.groupByKey().combineValues(Aggregators.pairAggregator(Aggregators.SUM_LONGS(),
Aggregators.MIN_LONGS()));
The above seems to work fine too, now I want to create new Visit classes and
fill in the count and minimum timestamp fields.
PCollection<Visit> count_visits = agg.parallelDo("visits-count", new
DoFn<Pair<Visit, Pair<Long, Long>>, Visit>() {
@Override
public void process(Pair<Visit, Pair<Long, Long>> p, Emitter<Visit>
emitter) {
Visit v = Visit.newBuilder(p.first()).build();
v.setVcount(p.second().first());
v.setFirsttimestamp(p.second().second());
emitter.emit(v);
}
}, Avros.specifics(Visit.class));
}
count_visits.write(To.textFile(outputPath), WriteMode.OVERWRITE);
Here's the error:
2014-08-21 15:09:26,245 ERROR run.CrunchReducer (CrunchReducer.java:reduce(54))
- Reducer exception
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot
be cast to com.test.Visit
at com.test.Logs$1.process(Logs.java:49)
at com.test.Logs$1.process(Logs.java:1)
at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
at
org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
So I'm pretty sure the problem is this line:
Visit v = Visit.newBuilder(p.first()).build();
specifically p.first() should be a Visit type but I guess it isn't. I assume
the output of the groupBy operation in the reducers is serializing the key but
not using the correct Avro type to do it?
Also I don't think I understand when I should be using Avros.records() versus
Avros.specifics() when I have a generated avro file.
Thanks!
Danny