Hi Josh,
Those two functions bring the keys for each PCollections so that
they can join in the next step. Here is the functions look like:
public class KeyOnLabels extends DoFn<Labels, Pair<String, Labels>>{
private String key;
public KeyOnLabels(String input)
{
this.key = input;
}
@Override
public void process(Labels input, Emitter<Pair<String, Labels>> emitter)
{
if(key.equalsIgnoreCase("class_ID"))
emitter.emit(Pair.of(input.getClassID(), input));
else if(key.equalsIgnoreCase("sample_ID"))
emitter.emit(Pair.of(input.getSampleID(), input));
else
emitter.emit(Pair.of(Integer.toString(input.getBinaryIndicator()), input));
}
}
public class KeyOnFeats extends DoFn<Feats, Pair<String, Feats>>{
private final static Logger logger = Logger
.getLogger(KeyOnFeats.class.getName());
private String key;
public KeyOnFeats(String input)
{
this.key = input;
}
@Override
public void process(Feats input, Emitter<Pair<String, Feats>> emitter)
{
if(key.equals("sample_ID"))
emitter.emit(Pair.of(input.getSampleID(), input));
else
logger.error("The key should be specified as sample_ID only!");
}
}
Meanwhile I checked the two outputs and they looked like normal.Something
like the follows:
{"key": "ID1", "value": Feats@14f5e86}
{"key": "ID1", "value": Labels@489983f3}
That's why I feel something weird happened in the join step.
Thanks!
Lucy
On Wed, May 13, 2015 at 12:34 PM, Josh Wills <[email protected]> wrote:
> What do the KeyOnLabels and KeyOnFeats functions return? From the error,
> it looks like the labels_data PTable is actually a PCollection<String>,
> which would happen if KeyOnLabels was somehow returning a String in some
> situations despite claiming to return a Pair<String, Labels>.
>
> On Wed, May 13, 2015 at 12:31 PM, Lucy Chen <[email protected]>
> wrote:
>
>> Meanwhile, I just realized that one of my joining jobs look also OK,
>> which also had the Avro type in it:
>>
>> JoinStrategy<String, Feats, Feats> strategy
>>
>> = new DefaultJoinStrategy<String, Feats, Feats>(50);
>>
>> PTable<String, Pair<Feats,Feats>> joined = strategy.join(input_A,
>> input_B, JoinType.INNER_JOIN);
>>
>>
>> So the Avro type probably is not the issue.
>>
>>
>> Any advice?
>>
>>
>> Lucy
>>
>> On Wed, May 13, 2015 at 12:22 PM, Lucy Chen <[email protected]>
>> wrote:
>>
>>> Hi all,
>>>
>>> I had a join step in my crunch pipeline, and it looks like the
>>> following:
>>>
>>> //get label data
>>>
>>> PType<Labels> LabelsType = Avros.records(Labels.class);
>>>
>>> PCollection<Labels> training_labels = input.parallelDo(new
>>> LabelDataParser(), LabelsType);
>>>
>>> PTable<String, Labels> labels_data = training_labels.
>>>
>>> parallelDo(new KeyOnLabels("sample_ID"), tableOf(strings(),
>>> LabelsType));
>>>
>>> //get features
>>>
>>> PType<Feats> FeatsType = Avros.records(Feats.class);
>>>
>>> PCollection<Feats> training_feats = Feature.FeatLoader(pipeline,
>>> sample_features_inputs);
>>>
>>> PTable<String, Feats> feats_data = training_feats.parallelDo(new
>>> KeyOnFeats("sample_ID"), tableOf(strings(), FeatsType));
>>>
>>>
>>> //join labels and features
>>>
>>> JoinStrategy<String, Labels, Feats> strategy = new
>>> DefaultJoinStrategy<String, Labels, Feats>(20);
>>>
>>> PTable<String, Pair<Labels, Feats>> joined_training = strategy.
>>>
>>> join(labels_data, feats_data, JoinType.INNER_JOIN);
>>>
>>>
>>> //class Labels
>>>
>>> public class Labels implements java.io.Serializable, Cloneable{
>>>
>>> private String class_ID;
>>>
>>> private String sample_ID;
>>>
>>> private int binary_ind;
>>>
>>> public Labels()
>>>
>>> {
>>>
>>> this(null, null, 0);
>>>
>>> }
>>>
>>> public Labels(String class_ID, String sample_ID, int ind)
>>>
>>> {
>>>
>>> this.class_ID = class_ID;
>>>
>>> this.sample_ID = sample_ID;
>>>
>>> this.binary_ind = ind;
>>>
>>> }
>>>
>>> ...
>>>
>>> }
>>>
>>>
>>> //class Feats
>>>
>>>
>>> public class *Feats* implements java.io.Serializable, Cloneable{
>>>
>>> private String sample_id;
>>>
>>> private String sample_name;
>>>
>>> private Map<String, Float> feat;
>>>
>>> public Feats()
>>>
>>> {
>>>
>>> this(null, null, null);
>>>
>>> }
>>>
>>> public Feats(String id, String name, Map<String, Float> feat)
>>>
>>> {
>>>
>>> this.sample_id = id;
>>>
>>> this.sample_name = name;
>>>
>>> this.feat = feat;
>>>
>>> }
>>>
>>> ...
>>>
>>>
>>> }
>>>
>>>
>>> The outputs of labels_data and feats_data are both fine; but the
>>> join step throws the following exception:
>>>
>>>
>>> Error: java.lang.ClassCastException: java.lang.String cannot be cast to
>>> org.apache.crunch.Pair at
>>> org.apache.crunch.lib.join.DefaultJoinStrategy$1.map(DefaultJoinStrategy.java:87)
>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>>> at org.apache.crunch.MapFn.process(MapFn.java:34) at
>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at
>>> org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at
>>> org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at
>>> org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at
>>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at
>>> org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) at
>>> org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) at
>>> java.security.AccessController.doPrivileged(Native Method) at
>>> javax.security.auth.Subject.doAs(Subject.java:415) at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
>>>
>>>
>>> This issue already bothered me for a while; Did any one get a
>>> similar issue here? Is there another option that will solve it?
>>>
>>>
>>> Btw, I already successfully ran the following joining job:
>>>
>>>
>>> JoinStrategy<String, Float, Tuple3<String, String, Float>> strategy =
>>> new DefaultJoinStrategy<String, Float, Tuple3<String, String,
>>> Float>>(100);
>>>
>>> PTable<String, Pair<Float,Tuple3<String, String, Float>>> joined =
>>> strategy.join(input_A, input_B, JoinType.INNER_JOIN);
>>>
>>>
>>> So I guess the issue may be still related to the Avro types that I
>>> defined.
>>>
>>>
>>> Thanks for your advice.
>>>
>>>
>>> Lucy
>>>
>>>
>>>
>>>
>>>
>>
>