In Crunch, is it possible to reuse a PCollection multiple times for
different purposes in the same pipeline run? My pseudocode looks something
like the following, but I get an error File does not exist: /tmp/crunch-..
when I run it. If I comment out the second processing path
(processFinal(paths.second()) line in process() method) I do not get the
error and the pipeline executes successfully.
// 1. Entry point
*public int run() {*
process();
getPipeline().done();
*} // end of run()*
// 2.
*private void process() {*
Pair<Path, Path> paths = processIntermediate();
PTable<String, String> ret1 = processFinal(paths.first());
PTable<String, String> ret2 = processFinal(paths.second());
return ret1.union(ret2);
*} // end of process()*
// 3.
*private Pair<Path, Path> processIntermediate() {*
PTable<String, Integer> data = ...; // read data from wherever
// filter data from the input
Path path1 = filter(data, fs, true); // filter() will write a
PCollection to an AvroFileSourceTarget and return its path, which will be
used later to read the collection back and do further processing.
Path path2 = filter(data, fs, false);
// getPipeline().run();
return Pair.of(path1, path2);
*} // end of **processIntermediate*
// 4.
*private PTable<String, String> processFinal(Path path) {*
PCollection<String> table = getPipeline().read(new
AvroFileSource<>(path), records(Strings));
return table.parallelDo(...);
*} // end of processFinal*
I imagine I could probably use Oozie workflow actions to simplify the
processing but if this is just a matter of syntax/rearranging the code, I
would like to know it.
*Thanks in advance!*