Hi, I just recently started to use Crunch, and got issues really confused
me. With the following codes running, result.succeeded() is 1;
however, if I added one line "labelData.write(At.textFile("/labelData"),
WriteMode.OVERWRITE);" before running the pipeline, result.succeeded() is
0.
I wonder why this happened and what's the potential issue caused it failed.
Did I have something wrong?
Thanks for your help.
Lu
public class LabelDataCollector implements Serializable{
//covert the raw text inputs to the FeatObject object
public PCollection<FeatObject> getFeatSamples(Pipeline pipeline, String
labelDataPath)
{
PCollection<String> rawInputs = pipeline.readTextFile(labelDataPath);
PCollection<String> validLines = rawInputs.filter(new
FeatValueNullFilter());
PType<FeatObject> featObjectType = Avros.reflects(FeatObject.class);
PCollection<FeatObject> featSamples = validLines.parallelDo(new
GenerateFeatSample(), featObjectType);
return featSamples;
}
public static void main(String args[]) throws IOException,
InterruptedException {
Configuration conf = new Configuration();
conf.set("fs.default.name", "file:///");
conf.set("mapred.job.tracker", "local");
Pipeline pipeline = new MRPipeline(LabelDataCollector.class,
"LabelDataCollector", conf);
LabelDataCollector ldc = new LabelDataCollector();
PCollection<FeatObject> labelData = ldc.getFeatSamples(pipeline,
"/feat_inputs");
// Execute the pipeline as a MapReduce.
PipelineResult result = pipeline.done();
System.out.println(result.succeeded() ? 0 : 1);
}
}
public class GenerateFeatSample extends DoFn<String, FeatObject>{
private final static Logger logger = Logger
.getLogger(FeatObject.class.getName());
private static final long serialVersionUID = 1L;
public void process(String input, Emitter<FeatObject> emitter){
if (input == null || input.isEmpty()) {
logger.error("Input is null or empty");
return;
}
logger.info("convert the input string to a feature object!");
emitter.emit(new FeatObject(input));
}
}
public class FeatObject implements java.io.Serializable, Cloneable{
private static final long serialVersionUID = 1L;
private String labelID;
private String sampleID;
private int pos_neg_ind;
private Map<String, Double> feat_val_pair;
private int num_of_feat;
public FeatObject(String input)
{
...
}
}