I seem to be misunderstanding write() in pipelines somehow.

MyClass extends CrunchTool {

 PTable<String, String> lines = this.read(mySource);
 PCollection<Log> parsed = lines.parallelDo("initial-parsing", new myParser(), 
Avros.specifics(Log.class));

 PTable<Visit, Pair<Long, Long>> visits = parsed.parallelDo("visits-parsing", 
new VisitsExtractor(),
          Avros.tableOf(Avros.specifics(Visit.class), 
Avros.pairs(Avros.longs(), Avros.longs())));
PTable<Visit, Pair<Long, Long>> agg = 
visits.groupByKey().combineValues(Aggregators.pairAggregator(Aggregators.SUM_LONGS(),
 Aggregators.MIN_LONGS()));

agg.write(To.avroFile(outputPath+"/visits"), WriteMode.OVERWRITE);
parsed.write(To.avroFile(outputPath+"/raw"), WriteMode.OVERWRITE);
this.run();
this.done();

So this runs fine however at the end of it all the "/raw" contains:
_SUCCESS
part-m-00000.avro

but the "/visits" directory is missing its avro files:
_SUCCESS

Now if I change the above code to this:
agg.write(To.avroFile(outputPath+"/visits"), WriteMode.OVERWRITE);
this.run();
parsed.write(To.avroFile(outputPath+"/raw"), WriteMode.OVERWRITE);
this.done();

The change being when I call this.run() both directories now contain their 
respective avro files and _SUCCESS files.

What's going on? Can I have multiple writes in a single pipeline? Would I ever 
call run() on a pipeline more than once? How does the order of when I call 
run() matter especially since I try to write to "/visits" first?

BTW My crunch version is crunch-0.10.0-hadoop2

Thanks!
                                          

Reply via email to