Thank you both David and Josh for responding so swiftly. Yes, the generated OurAvroDataClass is a SpecificRecord, but what isn't shown by my code snippet is that OurCrunchToolClass is not exclusively meant for OurAvroDataClass. As such, OurAvroDataClass cannot be used in the MapFns in the proposed way.
I have tried to work with specifics rather than generics by passing down the type of OurAvroDataClass (as Class<R extends SpecificRecord>) to OurCrunchToolClass. This is necessary for use in Avros.specifics(class). Taking this route I end up with an onfortunate [1]. [1] org.apache.avro.generic.GenericData$Record cannot be cast to org.apache.avro.specific.SpecificRecord On 16-03-15 17:30, Josh Wills wrote: > David's version should fix the error; the problem is that your > namespace.OurAvroDataClass isn't a subclass of GenericData.Record. > > On Mon, Mar 16, 2015 at 8:40 AM, David Ortiz <[email protected] > <mailto:[email protected]>> wrote: > > Also, are you sure GenericData.Record is the correct class? I know > when I use avro to build my records they normally end up as a > SpecificRecord rather than a GenericRecord. > > On Mon, Mar 16, 2015 at 11:38 AM David Ortiz <[email protected] > <mailto:[email protected]>> wrote: > > Mattijs, > > Any particular reason you're taking that approach rather > than something like... > > > keyedStagedLogs = coalescedStagedLogs.__parallelDo__( > "sort-pre", > new MapFn<OurAvroType, Pair<Long, OurAvroType>>() { > > private static final long serialVersionUID = 1L; > @Override > public Pair<Long, OurAvroType> map(OurAvroTypeinput) { > > Long record_ts_key = > (Long)input.get(partition___time___sourcename); > return Pair.of(record_ts_key, input); > } > }, > tf.pairs(tf.longs(), Avros.records(OurAvroType.class)) > ); > > Thanks, > Dave > > > On Mon, Mar 16, 2015 at 11:35 AM Mattijs Jonker > <[email protected] <mailto:[email protected]>> wrote: > > Hello, > > I am trying to sort Avro data based on a given field in a Crunch > pipeline. Since the field in question (a timestamp) does not > come first > in the Avro schema (and hence does not dictate primarily the > normal sort > order), I map the Record to a Pair<Long, Record> first to > Sort.sort on > the desired field. My below code [2] is loosely inspired by > the first > DoFn in [1]. > > Unfortunately, I encounter a ClassCastException [3] that I > find hard to > solve on my own. I do not fully understand the way types are > handled at > runtime, but my guess is that based on the name and > namespace in the > schema, the first MapFn results in a > namespace.OurAvroDataClass Object > (which is a SpecificRecord). > > I would appreciate it if somebody can hint at how to > overcome the > exception. An alternative method to achieve this sorting is > also welcome. > > Sincerely, > > Mattijs > > > [1] > > http://blog.cloudera.com/blog/____2014/05/how-to-process-time-__se__ries-data-using-apache-__crunch/ > > <http://blog.cloudera.com/blog/2014/05/how-to-process-time-series-data-using-apache-crunch/> > > [2] > > PTypeFamily tf = coalescedStagedLogs.__getTypeFam__ily(); > keyedStagedLogs = coalescedStagedLogs.__parallelDo__( > "sort-pre", > new MapFn<Record, Pair<Long, Record>>() { > private static final long serialVersionUID = 1L; > @Override > public Pair<Long, Record> map(Record input) { > Long record_ts_key = > (Long)input.get(partition___time___sourcename); > return Pair.of(record_ts_key, input); > } > }, > tf.pairs(tf.longs(), Avros.generics(schema)) > ); > > sortedKeyedStagedLogs = Sort.sort(keyedStagedLogs, > Sort.Order.ASCENDING); // Sort > > sortedStagedLogs = sortedKeyedStagedLogs.__parallel__Do( > "sort-post", > new MapFn<Pair<Long, Record>, Record>() { > private static final long serialVersionUID = 1L; > @Override > public Record map(Pair<Long, Record> input) { > return new Record(input.second(), true); > } > }, > Avros.generics(schema) > ); > > [3] > Caused by: java.lang.ClassCastException: > namespace.OurAvroDataClass > cannot be cast to > org.apache.avro.generic.__Generi__cData$Record at > > namespace.OurCrunchToolClass$__2__.map(OurCrunchToolClass.java:__l__n) > > ln => return new Record(input.second(), true); > > > > > -- > Director of Data Science > Cloudera <http://www.cloudera.com> > Twitter: @josh_wills <http://twitter.com/josh_wills>
