By your pointing out that GenericData.Record is not a parent of SpecificRecord, and by using coalescedStagedLogs.getPType() rather than Avros.specifics(class), I managed to get it to work.
Thank you both! On 16-03-15 18:58, Josh Wills wrote: > Try "GenericRecord", which is the parent interface of both: > > https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/generic/GenericRecord.html > > On Mon, Mar 16, 2015 at 10:53 AM, Mattijs Jonker <[email protected] > <mailto:[email protected]>> wrote: > > Thank you both David and Josh for responding so swiftly. > > Yes, the generated OurAvroDataClass is a SpecificRecord, but what isn't > shown by my code snippet is that OurCrunchToolClass is not exclusively > meant for OurAvroDataClass. As such, OurAvroDataClass cannot be used in > the MapFns in the proposed way. > > I have tried to work with specifics rather than generics by passing down > the type of OurAvroDataClass (as Class<R extends SpecificRecord>) to > OurCrunchToolClass. This is necessary for use in Avros.specifics(class). > Taking this route I end up with an onfortunate [1]. > > > [1] > org.apache.avro.generic.GenericData$Record cannot be cast to > org.apache.avro.specific.SpecificRecord > > On 16-03-15 17:30, Josh Wills wrote: > > David's version should fix the error; the problem is that your > > namespace.OurAvroDataClass isn't a subclass of GenericData.Record. > > > > On Mon, Mar 16, 2015 at 8:40 AM, David Ortiz <[email protected] > <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>> wrote: > > > > Also, are you sure GenericData.Record is the correct class? I know > > when I use avro to build my records they normally end up as a > > SpecificRecord rather than a GenericRecord. > > > > On Mon, Mar 16, 2015 at 11:38 AM David Ortiz <[email protected] > <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>> wrote: > > > > Mattijs, > > > > Any particular reason you're taking that approach rather > > than something like... > > > > > > keyedStagedLogs = coalescedStagedLogs.__parallelDo__( > > "sort-pre", > > new MapFn<OurAvroType, Pair<Long, OurAvroType>>() { > > > > private static final long serialVersionUID = 1L; > > @Override > > public Pair<Long, OurAvroType> map(OurAvroTypeinput) { > > > > Long record_ts_key = > > (Long)input.get(partition___time___sourcename); > > return Pair.of(record_ts_key, input); > > } > > }, > > tf.pairs(tf.longs(), Avros.records(OurAvroType.class)) > > ); > > > > Thanks, > > Dave > > > > > > On Mon, Mar 16, 2015 at 11:35 AM Mattijs Jonker > > <[email protected] <mailto:[email protected]> > <mailto:[email protected] <mailto:[email protected]>>> wrote: > > > > Hello, > > > > I am trying to sort Avro data based on a given field > in a Crunch > > pipeline. Since the field in question (a timestamp) > does not > > come first > > in the Avro schema (and hence does not dictate > primarily the > > normal sort > > order), I map the Record to a Pair<Long, Record> first to > > Sort.sort on > > the desired field. My below code [2] is loosely > inspired by > > the first > > DoFn in [1]. > > > > Unfortunately, I encounter a ClassCastException [3] that I > > find hard to > > solve on my own. I do not fully understand the way > types are > > handled at > > runtime, but my guess is that based on the name and > > namespace in the > > schema, the first MapFn results in a > > namespace.OurAvroDataClass Object > > (which is a SpecificRecord). > > > > I would appreciate it if somebody can hint at how to > > overcome the > > exception. An alternative method to achieve this > sorting is > > also welcome. > > > > Sincerely, > > > > Mattijs > > > > > > [1] > > > > http://blog.cloudera.com/blog/____2014/05/how-to-process-time-__se__ries-data-using-apache-__crunch/ > > > > <http://blog.cloudera.com/blog/2014/05/how-to-process-time-series-data-using-apache-crunch/> > > > > [2] > > > > PTypeFamily tf = coalescedStagedLogs.__getTypeFam__ily(); > > keyedStagedLogs = coalescedStagedLogs.__parallelDo__( > > "sort-pre", > > new MapFn<Record, Pair<Long, Record>>() { > > private static final long serialVersionUID = 1L; > > @Override > > public Pair<Long, Record> map(Record input) { > > Long record_ts_key = > > (Long)input.get(partition___time___sourcename); > > return Pair.of(record_ts_key, input); > > } > > }, > > tf.pairs(tf.longs(), Avros.generics(schema)) > > ); > > > > sortedKeyedStagedLogs = Sort.sort(keyedStagedLogs, > > Sort.Order.ASCENDING); // Sort > > > > sortedStagedLogs = sortedKeyedStagedLogs.__parallel__Do( > > "sort-post", > > new MapFn<Pair<Long, Record>, Record>() { > > private static final long serialVersionUID = 1L; > > @Override > > public Record map(Pair<Long, Record> input) { > > return new Record(input.second(), true); > > } > > }, > > Avros.generics(schema) > > ); > > > > [3] > > Caused by: java.lang.ClassCastException: > > namespace.OurAvroDataClass > > cannot be cast to > > org.apache.avro.generic.__Generi__cData$Record at > > > namespace.OurCrunchToolClass$__2__.map(OurCrunchToolClass.java:__l__n) > > > > ln => return new Record(input.second(), true); > > > > > > > > > > -- > > Director of Data Science > > Cloudera <http://www.cloudera.com> > > Twitter: @josh_wills <http://twitter.com/josh_wills> > > > > > -- > Director of Data Science > Cloudera <http://www.cloudera.com> > Twitter: @josh_wills <http://twitter.com/josh_wills>
