Mattijs,
Any particular reason you're taking that approach rather than
something like...
keyedStagedLogs = coalescedStagedLogs.parallelDo(
"sort-pre",
new MapFn<OurAvroType, Pair<Long, OurAvroType>>() {
private static final long serialVersionUID = 1L;
@Override
public Pair<Long, OurAvroType> map(OurAvroTypeinput) {
Long record_ts_key = (Long)input.get(partition_time_sourcename);
return Pair.of(record_ts_key, input);
}
},
tf.pairs(tf.longs(), Avros.records(OurAvroType.class))
);
Thanks,
Dave
On Mon, Mar 16, 2015 at 11:35 AM Mattijs Jonker <[email protected]> wrote:
> Hello,
>
> I am trying to sort Avro data based on a given field in a Crunch
> pipeline. Since the field in question (a timestamp) does not come first
> in the Avro schema (and hence does not dictate primarily the normal sort
> order), I map the Record to a Pair<Long, Record> first to Sort.sort on
> the desired field. My below code [2] is loosely inspired by the first
> DoFn in [1].
>
> Unfortunately, I encounter a ClassCastException [3] that I find hard to
> solve on my own. I do not fully understand the way types are handled at
> runtime, but my guess is that based on the name and namespace in the
> schema, the first MapFn results in a namespace.OurAvroDataClass Object
> (which is a SpecificRecord).
>
> I would appreciate it if somebody can hint at how to overcome the
> exception. An alternative method to achieve this sorting is also welcome.
>
> Sincerely,
>
> Mattijs
>
>
> [1]
> http://blog.cloudera.com/blog/2014/05/how-to-process-time-
> series-data-using-apache-crunch/
>
> [2]
>
> PTypeFamily tf = coalescedStagedLogs.getTypeFamily();
> keyedStagedLogs = coalescedStagedLogs.parallelDo(
> "sort-pre",
> new MapFn<Record, Pair<Long, Record>>() {
> private static final long serialVersionUID = 1L;
> @Override
> public Pair<Long, Record> map(Record input) {
> Long record_ts_key = (Long)input.get(partition_time_sourcename);
> return Pair.of(record_ts_key, input);
> }
> },
> tf.pairs(tf.longs(), Avros.generics(schema))
> );
>
> sortedKeyedStagedLogs = Sort.sort(keyedStagedLogs,
> Sort.Order.ASCENDING); // Sort
>
> sortedStagedLogs = sortedKeyedStagedLogs.parallelDo(
> "sort-post",
> new MapFn<Pair<Long, Record>, Record>() {
> private static final long serialVersionUID = 1L;
> @Override
> public Record map(Pair<Long, Record> input) {
> return new Record(input.second(), true);
> }
> },
> Avros.generics(schema)
> );
>
> [3]
> Caused by: java.lang.ClassCastException: namespace.OurAvroDataClass
> cannot be cast to org.apache.avro.generic.GenericData$Record at
> namespace.OurCrunchToolClass$2.map(OurCrunchToolClass.java:ln)
>
> ln => return new Record(input.second(), true);
>