Hello, I am trying to sort Avro data based on a given field in a Crunch pipeline. Since the field in question (a timestamp) does not come first in the Avro schema (and hence does not dictate primarily the normal sort order), I map the Record to a Pair<Long, Record> first to Sort.sort on the desired field. My below code [2] is loosely inspired by the first DoFn in [1].
Unfortunately, I encounter a ClassCastException [3] that I find hard to solve on my own. I do not fully understand the way types are handled at runtime, but my guess is that based on the name and namespace in the schema, the first MapFn results in a namespace.OurAvroDataClass Object (which is a SpecificRecord). I would appreciate it if somebody can hint at how to overcome the exception. An alternative method to achieve this sorting is also welcome. Sincerely, Mattijs [1] http://blog.cloudera.com/blog/2014/05/how-to-process-time-series-data-using-apache-crunch/ [2] PTypeFamily tf = coalescedStagedLogs.getTypeFamily(); keyedStagedLogs = coalescedStagedLogs.parallelDo( "sort-pre", new MapFn<Record, Pair<Long, Record>>() { private static final long serialVersionUID = 1L; @Override public Pair<Long, Record> map(Record input) { Long record_ts_key = (Long)input.get(partition_time_sourcename); return Pair.of(record_ts_key, input); } }, tf.pairs(tf.longs(), Avros.generics(schema)) ); sortedKeyedStagedLogs = Sort.sort(keyedStagedLogs, Sort.Order.ASCENDING); // Sort sortedStagedLogs = sortedKeyedStagedLogs.parallelDo( "sort-post", new MapFn<Pair<Long, Record>, Record>() { private static final long serialVersionUID = 1L; @Override public Record map(Pair<Long, Record> input) { return new Record(input.second(), true); } }, Avros.generics(schema) ); [3] Caused by: java.lang.ClassCastException: namespace.OurAvroDataClass cannot be cast to org.apache.avro.generic.GenericData$Record at namespace.OurCrunchToolClass$2.map(OurCrunchToolClass.java:ln) ln => return new Record(input.second(), true);
