Don't call done in between sections where you use the PCollection and it should work.
On Wed, Oct 21, 2015 at 2:57 PM Rushi <[email protected]> wrote: > In Crunch, is it possible to reuse a PCollection multiple times for > different purposes in the same pipeline run? My pseudocode looks something > like the following, but I get an error File does not exist: /tmp/crunch-.. > when I run it. If I comment out the second processing path > (processFinal(paths.second()) line in process() method) I do not get the > error and the pipeline executes successfully. > > // 1. Entry point > *public int run() {* > process(); > > getPipeline().done(); > *} // end of run()* > > > // 2. > *private void process() {* > > Pair<Path, Path> paths = processIntermediate(); > > PTable<String, String> ret1 = processFinal(paths.first()); > PTable<String, String> ret2 = processFinal(paths.second()); > > return ret1.union(ret2); > > *} // end of process()* > > > // 3. > *private Pair<Path, Path> processIntermediate() {* > > PTable<String, Integer> data = ...; // read data from wherever > > // filter data from the input > Path path1 = filter(data, fs, true); // filter() will write a > PCollection to an AvroFileSourceTarget and return its path, which will be > used later to read the collection back and do further processing. > Path path2 = filter(data, fs, false); > > // getPipeline().run(); > > return Pair.of(path1, path2); > > *} // end of **processIntermediate* > > > // 4. > *private PTable<String, String> processFinal(Path path) {* > > PCollection<String> table = getPipeline().read(new > AvroFileSource<>(path), records(Strings)); > > return table.parallelDo(...); > > *} // end of processFinal* > > > I imagine I could probably use Oozie workflow actions to simplify the > processing but if this is just a matter of syntax/rearranging the code, I > would like to know it. > > *Thanks in advance!* >
