Could you post more of a snippet showing the generics? I've got a lot of
library code running off of the MyMapFn<S extends SpecificRecordBase, T>
approach. Haven't tried doing it on the overarching tool, but I would
imagine the approach would be similar. Using the snippet I posted
earlier...
Class<R> clazz;
public OurCrunchPipeline(Class<R> clazz) {
this.clazz = clazz;
}
keyedStagedLogs = coalescedStagedLogs.parallelDo(
"sort-pre",
new MapFn<R, Pair<Long, R>>() {
…
private static final long serialVersionUID = 1L;
@Override
public Pair<Long, R> map(R input) {
…
Long record_ts_key = (Long)input.get(partition_time_sourcename);
return Pair.of(record_ts_key, input);
}
},
tf.pairs(tf.longs(), Avros.records(clazz))
);
Something like that perhaps?
Thanks,
Dave
On Mon, Mar 16, 2015 at 1:57 PM Mattijs Jonker <[email protected]> wrote:
> Thank you both David and Josh for responding so swiftly.
>
> Yes, the generated OurAvroDataClass is a SpecificRecord, but what isn't
> shown by my code snippet is that OurCrunchToolClass is not exclusively
> meant for OurAvroDataClass. As such, OurAvroDataClass cannot be used in
> the MapFns in the proposed way.
>
> I have tried to work with specifics rather than generics by passing down
> the type of OurAvroDataClass (as Class<R extends SpecificRecord>) to
> OurCrunchToolClass. This is necessary for use in Avros.specifics(class).
> Taking this route I end up with an onfortunate [1].
>
>
> [1]
> org.apache.avro.generic.GenericData$Record cannot be cast to
> org.apache.avro.specific.SpecificRecord
>
> On 16-03-15 17:30, Josh Wills wrote:
> > David's version should fix the error; the problem is that your
> > namespace.OurAvroDataClass isn't a subclass of GenericData.Record.
> >
> > On Mon, Mar 16, 2015 at 8:40 AM, David Ortiz <[email protected]
> > <mailto:[email protected]>> wrote:
> >
> > Also, are you sure GenericData.Record is the correct class? I know
> > when I use avro to build my records they normally end up as a
> > SpecificRecord rather than a GenericRecord.
> >
> > On Mon, Mar 16, 2015 at 11:38 AM David Ortiz <[email protected]
> > <mailto:[email protected]>> wrote:
> >
> > Mattijs,
> >
> > Any particular reason you're taking that approach rather
> > than something like...
> >
> >
> > keyedStagedLogs = coalescedStagedLogs.__parallelDo__(
> > "sort-pre",
> > new MapFn<OurAvroType, Pair<Long, OurAvroType>>() {
> >
> > private static final long serialVersionUID = 1L;
> > @Override
> > public Pair<Long, OurAvroType> map(OurAvroTypeinput) {
> >
> > Long record_ts_key =
> > (Long)input.get(partition___time___sourcename);
> > return Pair.of(record_ts_key, input);
> > }
> > },
> > tf.pairs(tf.longs(), Avros.records(OurAvroType.class))
> > );
> >
> > Thanks,
> > Dave
> >
> >
> > On Mon, Mar 16, 2015 at 11:35 AM Mattijs Jonker
> > <[email protected] <mailto:[email protected]>> wrote:
> >
> > Hello,
> >
> > I am trying to sort Avro data based on a given field in a
> Crunch
> > pipeline. Since the field in question (a timestamp) does not
> > come first
> > in the Avro schema (and hence does not dictate primarily the
> > normal sort
> > order), I map the Record to a Pair<Long, Record> first to
> > Sort.sort on
> > the desired field. My below code [2] is loosely inspired by
> > the first
> > DoFn in [1].
> >
> > Unfortunately, I encounter a ClassCastException [3] that I
> > find hard to
> > solve on my own. I do not fully understand the way types are
> > handled at
> > runtime, but my guess is that based on the name and
> > namespace in the
> > schema, the first MapFn results in a
> > namespace.OurAvroDataClass Object
> > (which is a SpecificRecord).
> >
> > I would appreciate it if somebody can hint at how to
> > overcome the
> > exception. An alternative method to achieve this sorting is
> > also welcome.
> >
> > Sincerely,
> >
> > Mattijs
> >
> >
> > [1]
> > http://blog.cloudera.com/blog/____2014/05/how-to-process-
> time-__se__ries-data-using-apache-__crunch/
> > <http://blog.cloudera.com/blog/2014/05/how-to-process-
> time-series-data-using-apache-crunch/>
> >
> > [2]
> >
> > PTypeFamily tf = coalescedStagedLogs.__getTypeFam__ily();
> > keyedStagedLogs = coalescedStagedLogs.__parallelDo__(
> > "sort-pre",
> > new MapFn<Record, Pair<Long, Record>>() {
> > private static final long serialVersionUID = 1L;
> > @Override
> > public Pair<Long, Record> map(Record input) {
> > Long record_ts_key =
> > (Long)input.get(partition___time___sourcename);
> > return Pair.of(record_ts_key, input);
> > }
> > },
> > tf.pairs(tf.longs(), Avros.generics(schema))
> > );
> >
> > sortedKeyedStagedLogs = Sort.sort(keyedStagedLogs,
> > Sort.Order.ASCENDING); // Sort
> >
> > sortedStagedLogs = sortedKeyedStagedLogs.__parallel__Do(
> > "sort-post",
> > new MapFn<Pair<Long, Record>, Record>() {
> > private static final long serialVersionUID = 1L;
> > @Override
> > public Record map(Pair<Long, Record> input) {
> > return new Record(input.second(), true);
> > }
> > },
> > Avros.generics(schema)
> > );
> >
> > [3]
> > Caused by: java.lang.ClassCastException:
> > namespace.OurAvroDataClass
> > cannot be cast to
> > org.apache.avro.generic.__Generi__cData$Record at
> > namespace.OurCrunchToolClass$__2__.map(OurCrunchToolClass.
> java:__l__n)
> >
> > ln => return new Record(input.second(), true);
> >
> >
> >
> >
> > --
> > Director of Data Science
> > Cloudera <http://www.cloudera.com>
> > Twitter: @josh_wills <http://twitter.com/josh_wills>
>