Awesome adding:

AvroMode.setSpecificClassLoader(Visit.class.getClassLoader());

Did the trick, thanks Josh!

Do you mind clarifying when to use  Avros.records() 
versus Avros.specifics() or Avros.reflects()?

Thanks Again!


From: [email protected]
Date: Thu, 21 Aug 2014 13:20:20 -0700
Subject: Re: Trouble with Avro records
To: [email protected]

Okay. I suspect this is the problem:
https://issues.apache.org/jira/browse/CRUNCH-442

Gabriel fixed this for the upcoming 0.11 release, but there are a couple of 
workarounds in the comments. One is to put the avro jar into your job jar file, 
instead of relying on the one that is in the hadoop lib. The other is to 
configure AvroMode.setSpecificClassLoader w/the class loader for your Visit 
class before kicking off the job.



Josh

On Thu, Aug 21, 2014 at 1:10 PM, Danny Morgan <[email protected]> wrote:





Hi Josh,

crunch-0.10.0-hadoop2

Thanks.

From: [email protected]
Date: Thu, 21 Aug 2014 13:04:29 -0700
Subject: Re: Trouble with Avro records


To: [email protected]

That feels like an AvroMode-related exception; which version of Crunch are you 
using?


J

On Thu, Aug 21, 2014 at 12:45 PM, Danny Morgan <[email protected]> wrote:





Hi Guys,



Love crunch but having some trouble recently using Avro records. I think 
someone needs to write a Crunch book.

I'm trying to aggregate hourly visits to a page by each user. I do one pass to 
parse the records and then I try to "group by" unique users and hour and count 
the number of times they visited as well as their first visit time in the hour. 
Here's the Avro schema





{"namespace": "com.test",
 "type": "record",
 "name": "Visit",
 "fields": [
     {"name": "dateid", "type": "int"},            // Year Month Day Hour in 
PST as an integer e.g. 2014081103




     {"name": "userid", "type": "string"}, 
     {"name": "vcount",  "type": ["long", "null"]},
     {"name": "firsttimestamp",  "type": ["long", "null"]} // Unixtime stamp of 
first visit




 ]
}

Here I do the parsing, at first vcount and firstvisit aren't set.

PTable<Visit, Pair<Long, Long>> visits = parsed.parallelDo("visits-parsing", 
new VisitsExtractor(),
          Avros.tableOf(Avros.specifics(Visit.class), 
Avros.pairs(Avros.longs(), Avros.longs())));





The relevant line from VisitsExtractor:
emitter.emit(Pair.of(visit, Pair.of(1L, log.timestamp())));

Everything up to this point works fine, now I want to count up the unique 
visitors and the minimum timestamp.





        PTable<Visit, Pair<Long, Long>> agg = 
visits.groupByKey().combineValues(Aggregators.pairAggregator(Aggregators.SUM_LONGS(),
 Aggregators.MIN_LONGS()));

The above seems to work fine too, now I want to create new Visit classes and 
fill in the count and minimum timestamp fields.





        PCollection<Visit> count_visits = agg.parallelDo("visits-count", new 
DoFn<Pair<Visit, Pair<Long, Long>>, Visit>() {
          @Override   
          public void process(Pair<Visit, Pair<Long, Long>> p, Emitter<Visit> 
emitter) {




            Visit v = Visit.newBuilder(p.first()).build();
            v.setVcount(p.second().first());
            v.setFirsttimestamp(p.second().second());
            emitter.emit(v);
          }
       }, Avros.specifics(Visit.class));




    }

     count_visits.write(To.textFile(outputPath), WriteMode.OVERWRITE);

Here's the error:
2014-08-21 15:09:26,245 ERROR run.CrunchReducer (CrunchReducer.java:reduce(54)) 
- Reducer exception
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot 
be cast to com.test.Visit




        at com.test.Logs$1.process(Logs.java:49)
        at com.test.Logs$1.process(Logs.java:1)
        at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98)
        at 
org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)





So I'm pretty sure the problem is this line:

Visit v = Visit.newBuilder(p.first()).build();

specifically p.first() should be a Visit type but I guess it isn't. I assume 
the output of the groupBy operation in the reducers is serializing the key but 
not using the correct Avro type to do it?





Also I don't think I understand when I should be using Avros.records() versus 
Avros.specifics() when I have a generated avro file.

Thanks!

Danny
                                          



                                          

                                          

                                          

Reply via email to