I seem to be misunderstanding write() in pipelines somehow.
MyClass extends CrunchTool {
PTable<String, String> lines = this.read(mySource);
PCollection<Log> parsed = lines.parallelDo("initial-parsing", new myParser(),
Avros.specifics(Log.class));
PTable<Visit, Pair<Long, Long>> visits = parsed.parallelDo("visits-parsing",
new VisitsExtractor(),
Avros.tableOf(Avros.specifics(Visit.class),
Avros.pairs(Avros.longs(), Avros.longs())));
PTable<Visit, Pair<Long, Long>> agg =
visits.groupByKey().combineValues(Aggregators.pairAggregator(Aggregators.SUM_LONGS(),
Aggregators.MIN_LONGS()));
agg.write(To.avroFile(outputPath+"/visits"), WriteMode.OVERWRITE);
parsed.write(To.avroFile(outputPath+"/raw"), WriteMode.OVERWRITE);
this.run();
this.done();
So this runs fine however at the end of it all the "/raw" contains:
_SUCCESS
part-m-00000.avro
but the "/visits" directory is missing its avro files:
_SUCCESS
Now if I change the above code to this:
agg.write(To.avroFile(outputPath+"/visits"), WriteMode.OVERWRITE);
this.run();
parsed.write(To.avroFile(outputPath+"/raw"), WriteMode.OVERWRITE);
this.done();
The change being when I call this.run() both directories now contain their
respective avro files and _SUCCESS files.
What's going on? Can I have multiple writes in a single pipeline? Would I ever
call run() on a pipeline more than once? How does the order of when I call
run() matter especially since I try to write to "/visits" first?
BTW My crunch version is crunch-0.10.0-hadoop2
Thanks!