We have some testing helper code in the project (the dependency is named crunch-test-0.11.0-hadoop2) that we use for running integration tests in the main project. The snippet I sent was a modification of this one:
https://github.com/apache/crunch/blob/master/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java On Wed, May 13, 2015 at 11:02 PM, Lucy Chen <[email protected]> wrote: > Hi Josh, > > I am using crunch-core-0.11.0-hadoop2.jar; I did not find a class > AvroReflectIT under org.apache.crunch.io.avro. Is it in a new version? > Meanwhile how should I set the variable tmpDir in my code? What do you > mean by "done inside of AvroReflectIT"? You mean copy paste the above > code within the class AvroReflectIT and run it? > > Thanks! > > Lucy > > On Wed, May 13, 2015 at 6:09 PM, Josh Wills <[email protected]> wrote: > >> Hey Lucy, >> >> I tried to see if I could replicate the error by writing the following >> integration test (done inside of AvroReflectIT)-- does something like this >> fail when you run it? And you're using Crunch 0.11-hadoop2, or something >> earlier? >> >> @Test >> public void testJoinReflectedData() throws Exception { >> Pipeline pipeline = new MRPipeline(AvroReflectIT.class, >> tmpDir.getDefaultConfiguration()); >> AvroType<StringWrapper> atype = Avros.records(StringWrapper.class); >> PCollection<StringWrapper> p1 = pipeline.create(ImmutableList.of( >> new StringWrapper("josh"), new StringWrapper("wills"), new >> StringWrapper("foo")), atype); >> PCollection<StringWrapper> p2 = pipeline.create(ImmutableList.of( >> new StringWrapper("mike"), new StringWrapper("wills"), new >> StringWrapper("foo")), atype); >> PTable<String, StringWrapper> pt1 = p1.parallelDo(new MapFn<StringWrapper, >> Pair<String, StringWrapper>>() { >> @Override >> public Pair<String, StringWrapper> map(StringWrapper input) { >> return Pair.of(input.getValue(), input); >> } >> }, Avros.tableOf(Avros.strings(), atype)); >> PTable<String, StringWrapper> pt2 = p2.parallelDo(new MapFn<StringWrapper, >> Pair<String, StringWrapper>>() { >> @Override >> public Pair<String, StringWrapper> map(StringWrapper input) { >> return Pair.of(input.getValue(), input); >> } >> }, Avros.tableOf(Avros.strings(), atype)); >> JoinStrategy<String, StringWrapper, StringWrapper> joinStrategy = new >> DefaultJoinStrategy<String, StringWrapper, StringWrapper>(); >> PTable<String, Pair<StringWrapper, StringWrapper>> res = >> joinStrategy.join(pt1, pt2, JoinType.INNER_JOIN); >> for (Pair<String, Pair<StringWrapper, StringWrapper>> p : >> res.materialize()) { >> System.out.println(p); >> } >> } >> >> >> On Wed, May 13, 2015 at 3:10 PM, Lucy Chen <[email protected]> >> wrote: >> >>> Hi, >>> >>> I tried to dump the members from Label class to a TupleN, and dump >>> only sample_id and sample_name from Feats class to a Pair, and then do the >>> joining, >>> >>> JoinStrategy<String, TupleN, Pair<String, String>> strategy >>> >>> = new DefaultJoinStrategy<String, TupleN, Pair<String, String>>(20); >>> >>> PTable<String, Pair<TupleN, Pair<String, String>>> joined_training = >>> strategy. >>> >>> join(labels_data, feats_data, JoinType.INNER_JOIN); >>> >>> >>> The data dump step is good; I already verified the outputs. >>> >>> And still get the same exception from the joining. Looks like that it is >>> definitely not the issue of the Avro type that I defined. Now just got >>> stuck here... >>> >>> >>> Error: java.lang.ClassCastException: java.lang.String cannot be cast to >>> org.apache.crunch.Pair at >>> org.apache.crunch.lib.join.DefaultJoinStrategy$2.map(DefaultJoinStrategy.java:94) >>> at org.apache.crunch.MapFn.process(MapFn.java:34) at >>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at >>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) >>> at org.apache.crunch.MapFn.process(MapFn.java:34) at >>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at >>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at >>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at >>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at >>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at >>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at >>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at >>> java.security.AccessController.doPrivileged(Native Method) at >>> javax.security.auth.Subject.doAs(Subject.java:415) at >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) >>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157) >>> >>> On Wed, May 13, 2015 at 2:05 PM, Lucy Chen <[email protected]> >>> wrote: >>> >>>> Hi Josh, >>>> >>>> Those two functions bring the keys for each PCollections so >>>> that they can join in the next step. Here is the functions look like: >>>> >>>> public class KeyOnLabels extends DoFn<Labels, Pair<String, Labels>>{ >>>> >>>> private String key; >>>> >>>> public KeyOnLabels(String input) >>>> >>>> { >>>> >>>> this.key = input; >>>> >>>> } >>>> >>>> @Override >>>> >>>> public void process(Labels input, Emitter<Pair<String, Labels>> >>>> emitter) >>>> >>>> { >>>> >>>> if(key.equalsIgnoreCase("class_ID")) >>>> >>>> emitter.emit(Pair.of(input.getClassID(), input)); >>>> >>>> else if(key.equalsIgnoreCase("sample_ID")) >>>> >>>> emitter.emit(Pair.of(input.getSampleID(), input)); >>>> >>>> else >>>> >>>> emitter.emit(Pair.of(Integer.toString(input.getBinaryIndicator()), >>>> input)); >>>> >>>> } >>>> >>>> >>>> } >>>> >>>> public class KeyOnFeats extends DoFn<Feats, Pair<String, Feats>>{ >>>> >>>> private final static Logger logger = Logger >>>> >>>> .getLogger(KeyOnFeats.class.getName()); >>>> >>>> private String key; >>>> >>>> public KeyOnFeats(String input) >>>> >>>> { >>>> >>>> this.key = input; >>>> >>>> } >>>> >>>> @Override >>>> >>>> public void process(Feats input, Emitter<Pair<String, Feats>> emitter) >>>> >>>> { >>>> >>>> if(key.equals("sample_ID")) >>>> >>>> emitter.emit(Pair.of(input.getSampleID(), input)); >>>> >>>> else >>>> >>>> logger.error("The key should be specified as sample_ID only!"); >>>> >>>> } >>>> >>>> >>>> } >>>> >>>> >>>> Meanwhile I checked the two outputs and they looked like >>>> normal.Something like the follows: >>>> >>>> >>>> {"key": "ID1", "value": Feats@14f5e86} >>>> >>>> >>>> {"key": "ID1", "value": Labels@489983f3} >>>> >>>> >>>> That's why I feel something weird happened in the join step. >>>> >>>> >>>> Thanks! >>>> >>>> >>>> Lucy >>>> >>>> On Wed, May 13, 2015 at 12:34 PM, Josh Wills <[email protected]> >>>> wrote: >>>> >>>>> What do the KeyOnLabels and KeyOnFeats functions return? From the >>>>> error, it looks like the labels_data PTable is actually a >>>>> PCollection<String>, which would happen if KeyOnLabels was somehow >>>>> returning a String in some situations despite claiming to return a >>>>> Pair<String, Labels>. >>>>> >>>>> On Wed, May 13, 2015 at 12:31 PM, Lucy Chen < >>>>> [email protected]> wrote: >>>>> >>>>>> Meanwhile, I just realized that one of my joining jobs look also OK, >>>>>> which also had the Avro type in it: >>>>>> >>>>>> JoinStrategy<String, Feats, Feats> strategy >>>>>> >>>>>> = new DefaultJoinStrategy<String, Feats, Feats>(50); >>>>>> >>>>>> PTable<String, Pair<Feats,Feats>> joined = strategy.join(input_A, >>>>>> input_B, JoinType.INNER_JOIN); >>>>>> >>>>>> >>>>>> So the Avro type probably is not the issue. >>>>>> >>>>>> >>>>>> Any advice? >>>>>> >>>>>> >>>>>> Lucy >>>>>> >>>>>> On Wed, May 13, 2015 at 12:22 PM, Lucy Chen < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> I had a join step in my crunch pipeline, and it looks like >>>>>>> the following: >>>>>>> >>>>>>> //get label data >>>>>>> >>>>>>> PType<Labels> LabelsType = Avros.records(Labels.class); >>>>>>> >>>>>>> PCollection<Labels> training_labels = input.parallelDo(new >>>>>>> LabelDataParser(), LabelsType); >>>>>>> >>>>>>> PTable<String, Labels> labels_data = training_labels. >>>>>>> >>>>>>> parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(), >>>>>>> LabelsType)); >>>>>>> >>>>>>> //get features >>>>>>> >>>>>>> PType<Feats> FeatsType = Avros.records(Feats.class); >>>>>>> >>>>>>> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline, >>>>>>> sample_features_inputs); >>>>>>> >>>>>>> PTable<String, Feats> feats_data = training_feats.parallelDo(new >>>>>>> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType)); >>>>>>> >>>>>>> >>>>>>> //join labels and features >>>>>>> >>>>>>> JoinStrategy<String, Labels, Feats> strategy = new >>>>>>> DefaultJoinStrategy<String, Labels, Feats>(20); >>>>>>> >>>>>>> PTable<String, Pair<Labels, Feats>> joined_training = strategy. >>>>>>> >>>>>>> join(labels_data, feats_data, JoinType.INNER_JOIN); >>>>>>> >>>>>>> >>>>>>> //class Labels >>>>>>> >>>>>>> public class Labels implements java.io.Serializable, Cloneable{ >>>>>>> >>>>>>> private String class_ID; >>>>>>> >>>>>>> private String sample_ID; >>>>>>> >>>>>>> private int binary_ind; >>>>>>> >>>>>>> public Labels() >>>>>>> >>>>>>> { >>>>>>> >>>>>>> this(null, null, 0); >>>>>>> >>>>>>> } >>>>>>> >>>>>>> public Labels(String class_ID, String sample_ID, int ind) >>>>>>> >>>>>>> { >>>>>>> >>>>>>> this.class_ID = class_ID; >>>>>>> >>>>>>> this.sample_ID = sample_ID; >>>>>>> >>>>>>> this.binary_ind = ind; >>>>>>> >>>>>>> } >>>>>>> >>>>>>> ... >>>>>>> >>>>>>> } >>>>>>> >>>>>>> >>>>>>> //class Feats >>>>>>> >>>>>>> >>>>>>> public class *Feats* implements java.io.Serializable, Cloneable{ >>>>>>> >>>>>>> private String sample_id; >>>>>>> >>>>>>> private String sample_name; >>>>>>> >>>>>>> private Map<String, Float> feat; >>>>>>> >>>>>>> public Feats() >>>>>>> >>>>>>> { >>>>>>> >>>>>>> this(null, null, null); >>>>>>> >>>>>>> } >>>>>>> >>>>>>> public Feats(String id, String name, Map<String, Float> feat) >>>>>>> >>>>>>> { >>>>>>> >>>>>>> this.sample_id = id; >>>>>>> >>>>>>> this.sample_name = name; >>>>>>> >>>>>>> this.feat = feat; >>>>>>> >>>>>>> } >>>>>>> >>>>>>> ... >>>>>>> >>>>>>> >>>>>>> } >>>>>>> >>>>>>> >>>>>>> The outputs of labels_data and feats_data are both fine; but the >>>>>>> join step throws the following exception: >>>>>>> >>>>>>> >>>>>>> Error: java.lang.ClassCastException: java.lang.String cannot be cast >>>>>>> to org.apache.crunch.Pair at >>>>>>> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87) >>>>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at >>>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at >>>>>>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) >>>>>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at >>>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at >>>>>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at >>>>>>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at >>>>>>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at >>>>>>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at >>>>>>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at >>>>>>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at >>>>>>> java.security.AccessController.doPrivileged(Native Method) at >>>>>>> javax.security.auth.Subject.doAs(Subject.java:415) at >>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) >>>>>>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157) >>>>>>> >>>>>>> >>>>>>> This issue already bothered me for a while; Did any one get >>>>>>> a similar issue here? Is there another option that will solve it? >>>>>>> >>>>>>> >>>>>>> Btw, I already successfully ran the following joining job: >>>>>>> >>>>>>> >>>>>>> JoinStrategy<String, Float, Tuple3<String, String, Float>> strategy >>>>>>> = new DefaultJoinStrategy<String, Float, Tuple3<String, String, >>>>>>> Float>>(100); >>>>>>> >>>>>>> PTable<String, Pair<Float,Tuple3<String, String, Float>>> joined = >>>>>>> strategy.join(input_A, input_B, JoinType.INNER_JOIN); >>>>>>> >>>>>>> >>>>>>> So I guess the issue may be still related to the Avro types that >>>>>>> I defined. >>>>>>> >>>>>>> >>>>>>> Thanks for your advice. >>>>>>> >>>>>>> >>>>>>> Lucy >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >> >> -- >> Director of Data Science >> Cloudera <http://www.cloudera.com> >> Twitter: @josh_wills <http://twitter.com/josh_wills> >> > >
