Hi Rushi, What's happening inside your filter() method? What's the boolean flag? Is it calling pipeline.run()?
It seems like unless you call pipeline.run() or pipeline.done(), Crunch won't actually perform work and write out the tables to disk before the calls to processFinal, where it tries to read them back from disk. On Fri, Oct 23, 2015 at 9:41 AM, Rushi <[email protected]> wrote: > Does anyone have any idea why this might be happening? Is it possible that > after 'done' is called, one of the paths completes processing first, the > staging data gets cleared and thus causing the exception to be thrown for > the other path? > > Thanks. > > On Wed, Oct 21, 2015 at 3:01 PM, Rushi <[email protected]> wrote: > >> Thanks for replying. >> >> Actually, I'm not calling done in between the sections, but only at the >> end. The getPipeline().run() call in the processIntermediate() method is >> commented out (I was trying to see if that would help but it didn't so I >> commented it). >> >> >> On Wed, Oct 21, 2015 at 2:05 PM, David Ortiz <[email protected]> wrote: >> >>> Don't call done in between sections where you use the PCollection and it >>> should work. >>> >>> On Wed, Oct 21, 2015 at 2:57 PM Rushi <[email protected]> wrote: >>> >>>> In Crunch, is it possible to reuse a PCollection multiple times for >>>> different purposes in the same pipeline run? My pseudocode looks something >>>> like the following, but I get an error File does not exist: >>>> /tmp/crunch-.. when I run it. If I comment out the second processing >>>> path (processFinal(paths.second()) line in process() method) I do not get >>>> the error and the pipeline executes successfully. >>>> >>>> // 1. Entry point >>>> *public int run() {* >>>> process(); >>>> >>>> getPipeline().done(); >>>> *} // end of run()* >>>> >>>> >>>> // 2. >>>> *private void process() {* >>>> >>>> Pair<Path, Path> paths = processIntermediate(); >>>> >>>> PTable<String, String> ret1 = processFinal(paths.first()); >>>> PTable<String, String> ret2 = processFinal(paths.second()); >>>> >>>> return ret1.union(ret2); >>>> >>>> *} // end of process()* >>>> >>>> >>>> // 3. >>>> *private Pair<Path, Path> processIntermediate() {* >>>> >>>> PTable<String, Integer> data = ...; // read data from wherever >>>> >>>> // filter data from the input >>>> Path path1 = filter(data, fs, true); // filter() will write a >>>> PCollection to an AvroFileSourceTarget and return its path, which will be >>>> used later to read the collection back and do further processing. >>>> Path path2 = filter(data, fs, false); >>>> >>>> // getPipeline().run(); >>>> >>>> return Pair.of(path1, path2); >>>> >>>> *} // end of **processIntermediate* >>>> >>>> >>>> // 4. >>>> *private PTable<String, String> processFinal(Path path) {* >>>> >>>> PCollection<String> table = getPipeline().read(new >>>> AvroFileSource<>(path), records(Strings)); >>>> >>>> return table.parallelDo(...); >>>> >>>> *} // end of processFinal* >>>> >>>> >>>> I imagine I could probably use Oozie workflow actions to simplify the >>>> processing but if this is just a matter of syntax/rearranging the code, I >>>> would like to know it. >>>> >>>> *Thanks in advance!* >>>> >>> >> > -- *DISCLAIMER:* The contents of this email, including any attachments, may contain information that is confidential, proprietary in nature, protected health information (PHI), or otherwise protected by law from disclosure, and is solely for the use of the intended recipient(s). If you are not the intended recipient, you are hereby notified that any use, disclosure or copying of this email, including any attachments, is unauthorized and strictly prohibited. If you have received this email in error, please notify the sender of this email. Please delete this and all copies of this email from your system. Any opinions either expressed or implied in this email and all attachments, are those of its author only, and do not necessarily reflect those of Nuna Health, Inc.
