Also, are you sure GenericData.Record is the correct class? I know when I use avro to build my records they normally end up as a SpecificRecord rather than a GenericRecord.
On Mon, Mar 16, 2015 at 11:38 AM David Ortiz <[email protected]> wrote: > Mattijs, > > Any particular reason you're taking that approach rather than > something like... > > > keyedStagedLogs = coalescedStagedLogs.parallelDo( > "sort-pre", > new MapFn<OurAvroType, Pair<Long, OurAvroType>>() { > > private static final long serialVersionUID = 1L; > @Override > public Pair<Long, OurAvroType> map(OurAvroTypeinput) { > > Long record_ts_key = (Long)input.get(partition_time_sourcename); > return Pair.of(record_ts_key, input); > } > }, > tf.pairs(tf.longs(), Avros.records(OurAvroType.class)) > ); > > Thanks, > Dave > > > On Mon, Mar 16, 2015 at 11:35 AM Mattijs Jonker <[email protected]> > wrote: > >> Hello, >> >> I am trying to sort Avro data based on a given field in a Crunch >> pipeline. Since the field in question (a timestamp) does not come first >> in the Avro schema (and hence does not dictate primarily the normal sort >> order), I map the Record to a Pair<Long, Record> first to Sort.sort on >> the desired field. My below code [2] is loosely inspired by the first >> DoFn in [1]. >> >> Unfortunately, I encounter a ClassCastException [3] that I find hard to >> solve on my own. I do not fully understand the way types are handled at >> runtime, but my guess is that based on the name and namespace in the >> schema, the first MapFn results in a namespace.OurAvroDataClass Object >> (which is a SpecificRecord). >> >> I would appreciate it if somebody can hint at how to overcome the >> exception. An alternative method to achieve this sorting is also welcome. >> >> Sincerely, >> >> Mattijs >> >> >> [1] >> http://blog.cloudera.com/blog/2014/05/how-to-process-time-se >> ries-data-using-apache-crunch/ >> >> [2] >> >> PTypeFamily tf = coalescedStagedLogs.getTypeFamily(); >> keyedStagedLogs = coalescedStagedLogs.parallelDo( >> "sort-pre", >> new MapFn<Record, Pair<Long, Record>>() { >> private static final long serialVersionUID = 1L; >> @Override >> public Pair<Long, Record> map(Record input) { >> Long record_ts_key = (Long)input.get(partition_time_sourcename); >> return Pair.of(record_ts_key, input); >> } >> }, >> tf.pairs(tf.longs(), Avros.generics(schema)) >> ); >> >> sortedKeyedStagedLogs = Sort.sort(keyedStagedLogs, >> Sort.Order.ASCENDING); // Sort >> >> sortedStagedLogs = sortedKeyedStagedLogs.parallelDo( >> "sort-post", >> new MapFn<Pair<Long, Record>, Record>() { >> private static final long serialVersionUID = 1L; >> @Override >> public Record map(Pair<Long, Record> input) { >> return new Record(input.second(), true); >> } >> }, >> Avros.generics(schema) >> ); >> >> [3] >> Caused by: java.lang.ClassCastException: namespace.OurAvroDataClass >> cannot be cast to org.apache.avro.generic.GenericData$Record at >> namespace.OurCrunchToolClass$2.map(OurCrunchToolClass.java:ln) >> >> ln => return new Record(input.second(), true); >> >
