BTW Thanks josh ! That worked! Here is an example of how easy it is to do aggregations in crunch :) ~~~~~~
https://github.com/jayunit100/bigpetstore/commit/03a59fc88680d8926aba4c8d00760436c8cafb69 PS Are you sure PIG/HIVE is really better for this kind of stuff? I really like the IDE friendly, statically validated, strongly typed, functional API ALOT more than the russian roulette that I always seem to play with my pig/hive code :) On Sat, Jan 4, 2014 at 7:49 PM, Jay Vyas <[email protected]> wrote: > Thanks josh ..That was very helpful!! ..I like the avro mapper > intermediate solution I'll try it out. > > ...Also : would be interested in contributing a new "section" of the > bigpetstore workflow , a module which really showed where crunch's > differentiating factors were valuable? > > The idea is that bigpetstore should show the differences between different > ecosystem components so that people can pick for themselves which tool is > best for which job, and so I think it would be cool to have a phase in the > bigpetstore workflow which used some nested, strongly typed data and > processed it with crunch versus pig, to demonstrate (in code) the comments > you've made. > > Right now I only have pig and hive but want to add in cascading and > (obviously) crunch as well. > > On Jan 4, 2014, at 4:57 PM, Josh Wills <[email protected]> wrote: > > Hey Jay, > > Crunch isn't big into tuples; it's mostly used to process some sort of > structured, complex record data like Avro, protocol buffers, or Thrift. I > certainly don't speak for everyone in the community, but I think that using > one of these rich, evolvable formats is the best way to work with data on > Hadoop. For the problem you gave, where the data is in CSV text, there are > a couple of options. > > One option would be to use the TupleN type to represent a record and the > Extractor API in crunch-contrib to parse the lines of strings into typed > tokens, so you would do something like this to your PCollection<String>: > > PCollection<String> rawData = ...; > TokenizerFactory tokenize = TokenizerFactory.builder().delim(",").build(); > PCollection<TupleN> tuples = Parse.parse("bigpetshop", // a name to use > for the counters used in parsing > rawData, > xtupleN(tokenize, > xstring(), // big pet store > xstring(), // store code > xint(), // line item > xstring(), // first name > xstring(), // last name > xstring(), // timestamp > xdouble(), // price > xstring())); // item description > > You could also create a POJO to represent a LineItem (which is what I > assume this is) and then use Avro reflection-based serialization to > serialize it with Crunch: > > public static class LineItem { > String appName; > String storeCode; > int lineId; > String firstName; > String lastName; > String timestamp; > double price; > String description; > > public LineItem() { > // Avro reflection needs a zero-arg constructor > } > > // other constructors, parsers, etc. > } > > and then you would have something like this: > > PCollection<LineItem> lineItems = rawData.parallelDo(new MapFn<String, > LineItem>() { > @Override > public LineItem map(String input) { > // parse line to LineItem object > } > }, Avros.reflects(LineItem.class)); > > I'm not quite sure what you're doing in the grouping clause you have here: > > groupBy(0).count(); > > ...I assume you want to count the distinct values of the first field in > your tuple, which you would do like this for line items: > > PTable<String, Long> counts = lineItems.parallelDo(new MapFn<LineItem, > String>() { > public String map(LineItem lineItem) { return lineItem.appName; } > }, Avros.strings()).count(); > > and similarly for TupleN, although you would call get(0) on TupleN and > have to cast the returned Object to a String b/c TupleN methods don't have > type information. > > I hope that helps. In general, I don't really recommend Crunch for this > sort of data processing; Hive, Pig, and Cascading are fine alternatives. > But I think Crunch is superior to any of them if you were trying to, say, > create an Order record that aggregated the result of multiple LineItems: > > Order { > List<LineItem> lineItems; > // global order attributes > } > > or a customer type that aggregated multiple Orders for a single customer: > > Customer { > List<Order> orders; > // other customer fields > } > > ...especially if this was the sort of processing task you had to do > regularly because lots of other downstream processing tasks required these > standard aggregations to exist so that they could do their own > calculations. I would also recommend Crunch if you were building > BigPetStore on top of HBase using custom schemas that you needed to > periodically MapReduce over in order to calculate statistics, cleanup stale > data, or fix any consistency issues. > > Best, > Josh > > > > On Sat, Jan 4, 2014 at 12:34 PM, Jay Vyas <[email protected]> wrote: > >> Hi crunch ! >> >> I want to process a list in crunch: >> >> Something like this: >> >> PCollection<String> lines = MemPipeline.collectionOf( >> "BigPetStore,storeCode_AK,1 lindsay,franco,Sat Jan 10 >> 00:11:10 EST 1970,10.5,dog-food", >> "BigPetStore,storeCode_AZ,1 tom,giles,Sun Dec 28 >> 23:08:45 EST 1969,10.5,dog-food", >> "BigPetStore,storeCode_CA,1 brandon,ewing,Mon Dec 08 >> 20:23:57 EST 1969,16.5,organic-dog-food", >> "BigPetStore,storeCode_CA,2 angie,coleman,Thu Dec 11 >> 07:00:31 EST 1969,10.5,dog-food", >> "BigPetStore,storeCode_CA,3 angie,coleman,Tue Jan 20 >> 06:24:23 EST 1970,7.5,cat-food", >> "BigPetStore,storeCode_CO,1 sharon,trevino,Mon Jan 12 >> 07:52:10 EST 1970,30.1,antelope snacks", >> "BigPetStore,storeCode_CT,1 kevin,fitzpatrick,Wed Dec 10 >> 05:24:13 EST 1969,10.5,dog-food", >> "BigPetStore,storeCode_NY,1 dale,holden,Mon Jan 12 >> 23:02:13 EST 1970,19.75,fish-food", >> "BigPetStore,storeCode_NY,2 dale,holden,Tue Dec 30 >> 12:29:52 EST 1969,10.5,dog-food", >> "BigPetStore,storeCode_OK,1 donnie,tucker,Sun Jan 18 >> 04:50:26 EST 1970,7.5,cat-food"); >> >> PCollection coll = lines.parallelDo( >> "split lines into words", >> new DoFn<String, String>() { >> @Override >> public void process(String line, Emitter emitter) { >> //not sure this regex will work but you get the >> idea.. split by tabs and commas >> emitter.emit(Arrays.asList(line.split("\t,"))); >> } >> }, >> Writables.lists() >> ).groupBy(0).count(); >> >> } >> >> What is the correct abstraction in crunch to convert raw text into >> tuples, >> and access them by an index - which you then use to group and count on? >> >> thanks ! >> >> ** FYI ** this is for the bigpetstore project, id like to show crunch >> examples in it if i can get them working, as the API is a nice example of >> a lowerlevel mapreduce paradigm which is more java freindly. >> >> See https://issues.apache.org/jira/browse/BIGTOP-1089 and >> https://github.com/jayunit100/bigpetstore for details.. >> >> >> > > > -- > Director of Data Science > Cloudera <http://www.cloudera.com> > Twitter: @josh_wills <http://twitter.com/josh_wills> > > -- Jay Vyas http://jayunit100.blogspot.com
