David's version should fix the error; the problem is that your namespace.OurAvroDataClass isn't a subclass of GenericData.Record.
On Mon, Mar 16, 2015 at 8:40 AM, David Ortiz <[email protected]> wrote: > Also, are you sure GenericData.Record is the correct class? I know when I > use avro to build my records they normally end up as a SpecificRecord > rather than a GenericRecord. > > On Mon, Mar 16, 2015 at 11:38 AM David Ortiz <[email protected]> wrote: > >> Mattijs, >> >> Any particular reason you're taking that approach rather than >> something like... >> >> >> keyedStagedLogs = coalescedStagedLogs.parallelDo( >> "sort-pre", >> new MapFn<OurAvroType, Pair<Long, OurAvroType>>() { >> >> private static final long serialVersionUID = 1L; >> @Override >> public Pair<Long, OurAvroType> map(OurAvroTypeinput) { >> >> Long record_ts_key = (Long)input.get(partition_time_sourcename); >> return Pair.of(record_ts_key, input); >> } >> }, >> tf.pairs(tf.longs(), Avros.records(OurAvroType.class)) >> ); >> >> Thanks, >> Dave >> >> >> On Mon, Mar 16, 2015 at 11:35 AM Mattijs Jonker <[email protected]> >> wrote: >> >>> Hello, >>> >>> I am trying to sort Avro data based on a given field in a Crunch >>> pipeline. Since the field in question (a timestamp) does not come first >>> in the Avro schema (and hence does not dictate primarily the normal sort >>> order), I map the Record to a Pair<Long, Record> first to Sort.sort on >>> the desired field. My below code [2] is loosely inspired by the first >>> DoFn in [1]. >>> >>> Unfortunately, I encounter a ClassCastException [3] that I find hard to >>> solve on my own. I do not fully understand the way types are handled at >>> runtime, but my guess is that based on the name and namespace in the >>> schema, the first MapFn results in a namespace.OurAvroDataClass Object >>> (which is a SpecificRecord). >>> >>> I would appreciate it if somebody can hint at how to overcome the >>> exception. An alternative method to achieve this sorting is also welcome. >>> >>> Sincerely, >>> >>> Mattijs >>> >>> >>> [1] >>> http://blog.cloudera.com/blog/2014/05/how-to-process-time-se >>> ries-data-using-apache-crunch/ >>> >>> [2] >>> >>> PTypeFamily tf = coalescedStagedLogs.getTypeFamily(); >>> keyedStagedLogs = coalescedStagedLogs.parallelDo( >>> "sort-pre", >>> new MapFn<Record, Pair<Long, Record>>() { >>> private static final long serialVersionUID = 1L; >>> @Override >>> public Pair<Long, Record> map(Record input) { >>> Long record_ts_key = (Long)input.get(partition_time_sourcename); >>> return Pair.of(record_ts_key, input); >>> } >>> }, >>> tf.pairs(tf.longs(), Avros.generics(schema)) >>> ); >>> >>> sortedKeyedStagedLogs = Sort.sort(keyedStagedLogs, >>> Sort.Order.ASCENDING); // Sort >>> >>> sortedStagedLogs = sortedKeyedStagedLogs.parallelDo( >>> "sort-post", >>> new MapFn<Pair<Long, Record>, Record>() { >>> private static final long serialVersionUID = 1L; >>> @Override >>> public Record map(Pair<Long, Record> input) { >>> return new Record(input.second(), true); >>> } >>> }, >>> Avros.generics(schema) >>> ); >>> >>> [3] >>> Caused by: java.lang.ClassCastException: namespace.OurAvroDataClass >>> cannot be cast to org.apache.avro.generic.GenericData$Record at >>> namespace.OurCrunchToolClass$2.map(OurCrunchToolClass.java:ln) >>> >>> ln => return new Record(input.second(), true); >>> >> -- Director of Data Science Cloudera <http://www.cloudera.com> Twitter: @josh_wills <http://twitter.com/josh_wills>
