Hey Lucy, The write() method on PCollection doesn't take a PType argument; it gets it directly from the PType of the PCollection. training_labels.write(At.avroFile(...), WriteMode.OVERWRITE) will work.
J On Fri, May 15, 2015 at 12:05 PM, Lucy Chen <[email protected]> wrote: > Hi Josh, > > Now looks like the issue probably happened with the Class Labels. > I tried to store the training_labels as an avro output (txt outputs are > OK), > > training_labels.write(At.avroFile(output_path+"/training_labels_avro"), > LabelsType, WriteMode.OVERWRITE); > > > but get the following err: > > The method write(Target, Target.WriteMode) in the type PCollection<Labels> > is not applicable for the arguments (SourceTarget<GenericData.Record>, > PType<Labels>, Target.WriteMode) > > > Will get the similar issue when storing the labels_data > which is the first input of join. It seemed that Crunch did not recognize > the type of training_labels. But why here it treated it a > GenericData.Record? > > > Any suggestions? > > > Thanks! > > > Lucy > > On Wed, May 13, 2015 at 12:22 PM, Lucy Chen <[email protected]> > wrote: > >> Hi all, >> >> I had a join step in my crunch pipeline, and it looks like the >> following: >> >> //get label data >> >> PType<Labels> LabelsType = Avros.records(Labels.class); >> >> PCollection<Labels> training_labels = input.parallelDo(new >> LabelDataParser(), LabelsType); >> >> PTable<String, Labels> labels_data = training_labels. >> >> parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(), >> LabelsType)); >> >> //get features >> >> PType<Feats> FeatsType = Avros.records(Feats.class); >> >> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline, >> sample_features_inputs); >> >> PTable<String, Feats> feats_data = training_feats.parallelDo(new >> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType)); >> >> >> //join labels and features >> >> JoinStrategy<String, Labels, Feats> strategy = new >> DefaultJoinStrategy<String, Labels, Feats>(20); >> >> PTable<String, Pair<Labels, Feats>> joined_training = strategy. >> >> join(labels_data, feats_data, JoinType.INNER_JOIN); >> >> >> //class Labels >> >> public class Labels implements java.io.Serializable, Cloneable{ >> >> private String class_ID; >> >> private String sample_ID; >> >> private int binary_ind; >> >> public Labels() >> >> { >> >> this(null, null, 0); >> >> } >> >> public Labels(String class_ID, String sample_ID, int ind) >> >> { >> >> this.class_ID = class_ID; >> >> this.sample_ID = sample_ID; >> >> this.binary_ind = ind; >> >> } >> >> ... >> >> } >> >> >> //class Feats >> >> >> public class *Feats* implements java.io.Serializable, Cloneable{ >> >> private String sample_id; >> >> private String sample_name; >> >> private Map<String, Float> feat; >> >> public Feats() >> >> { >> >> this(null, null, null); >> >> } >> >> public Feats(String id, String name, Map<String, Float> feat) >> >> { >> >> this.sample_id = id; >> >> this.sample_name = name; >> >> this.feat = feat; >> >> } >> >> ... >> >> >> } >> >> >> The outputs of labels_data and feats_data are both fine; but the join >> step throws the following exception: >> >> >> Error: java.lang.ClassCastException: java.lang.String cannot be cast to >> org.apache.crunch.Pair at >> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87) >> at org.apache.crunch.MapFn.process(MapFn.java:34) at >> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at >> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) >> at org.apache.crunch.MapFn.process(MapFn.java:34) at >> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at >> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at >> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at >> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at >> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at >> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at >> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at >> java.security.AccessController.doPrivileged(Native Method) at >> javax.security.auth.Subject.doAs(Subject.java:415) at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) >> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157) >> >> >> This issue already bothered me for a while; Did any one get a >> similar issue here? Is there another option that will solve it? >> >> >> Btw, I already successfully ran the following joining job: >> >> >> JoinStrategy<String, Float, Tuple3<String, String, Float>> strategy = new >> DefaultJoinStrategy<String, Float, Tuple3<String, String, Float>>(100); >> >> PTable<String, Pair<Float,Tuple3<String, String, Float>>> joined = >> strategy.join(input_A, input_B, JoinType.INNER_JOIN); >> >> >> So I guess the issue may be still related to the Avro types that I >> defined. >> >> >> Thanks for your advice. >> >> >> Lucy >> >> >> >> >> > -- Director of Data Science Cloudera <http://www.cloudera.com> Twitter: @josh_wills <http://twitter.com/josh_wills>
