I think it's a bug, or at least, a configuration issue. When you construct the MRPipeline, are you explicitly passing it a Configuration object? On Thu, Dec 4, 2014 at 6:24 PM Danny Morgan <[email protected]> wrote:
> Hi Josh, > > Sorry I mixed up pipelines there is no s3 write in this case. > > So you are correct the intermediate Avro file that's the output of the > SecondarySort is labeled "/tmp" I don't manually create this local file, > the crunch planner seems to insert that materialization phase in. If you > refer back to my original email the error I get is: > > "org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path > does not exist: hdfs:///tmp/crunch-1279941375/p1 > " > > So the dot plan has the file labeled as "/tmp/crunch-*" however when the > job runs it's expecting to find an "hdfs:///tmp/crunch-*". Is this a > labeling issue with the plan output or might this be the bug? > > -Danny > > ------------------------------ > From: [email protected] > Date: Thu, 4 Dec 2014 15:44:37 -0800 > > Subject: Re: Multiple Reduces in a Single Crunch Job > To: [email protected] > > Hey Danny, > > Inlined. > > On Thu, Dec 4, 2014 at 3:20 PM, Danny Morgan <[email protected]> > wrote: > > Hi Josh, > > Thanks for taking the time to look into this. > > I do get a PCollection<Object, String> and split it. I write the Avro > objects as parquet to HDFS and I get the String collection and write it out > to s3n://. I have noticed that the s3n:// targets copy their files to the > local filesystem's /tmp and then copy the file up to s3. This process > happens serially and is super slow, I'm not sure if it's a crunch issue or > a general HDFS one. > > > I'm not following; I'm referring to the second_phase.pdf plan file, which > has a bunch of Avro inputs that are being merged together and secondary > sorted (some sort of sessionization, I assume) followed by a > GBK/combineValues and then the write to Parquet. Where does the > PCollection<Object, String> fit in? And is the S3 write part of the same > Pipeline instance? I'm wondering if the multiple FileSystems are confusing > the planner w/respect to where it should create the temp file. > > > > Let me know if I can help debug further, as I mentioned calling > pipeline.cache() and pipeline.run() between the reduces did solve my > problem although I guess it is a hack. > > BTW Spotify's crunch-lib looks great, any integration plans? > > > I also really like it and would like to incorporate basically all of it; > will start a thread on dev@ about it and see if David is up for it. > > > > -Danny > > ------------------------------ > From: [email protected] > Date: Thu, 4 Dec 2014 14:21:55 -0800 > > Subject: Re: Multiple Reduces in a Single Crunch Job > To: [email protected] > > Danny, > > Spent a couple of hours today banging on this by hacking on some > integration tests but couldn't replicate it. However, I just took a closer > look at the plan you posted, and I noticed that all of the files you are > writing out are prefixed w/ "hdfs:/" except for the /tmp/crunch-* file that > Crunch is creating; is it possible that Crunch is creating the temp file > locally on your client machine for some reason? I can't think of why that > would happen off the top of my head, but if that is the problem, I'll at > least be able to figure out where to look. > > Josh > > > On Tue, Nov 25, 2014 at 6:30 PM, Danny Morgan <[email protected]> > wrote: > > No problem, Happy Thanksgiving! > > Gobble Gobble... > > ------------------------------ > From: [email protected] > Date: Tue, 25 Nov 2014 18:23:14 -0800 > > Subject: Re: Multiple Reduces in a Single Crunch Job > To: [email protected] > > Very useful-- thank you. Will dig into it and report back, although I'm > heading out for the holiday so it likely won't be until early next week. > > J > > On Tue, Nov 25, 2014 at 6:16 PM, Danny Morgan <[email protected]> > wrote: > > Having a single pipeline in the application didn't fix it. Sticking a > pipeline.run() in the middle also didn't matter either, the plan appears > such that the planner is completely ignoring the second the run() I added. > > However what DOES WORK is if I do: > > collection = secondarySort() > pipeline.cache(collection) > pipeline.run() > newcollection = collection.groupByKey() > > If I try adding the cache() without calling run() in between it doesn't > work. Hope that's enough info for you to fix the possible planner bug. > > Thanks for the help Josh! > > ------------------------------ > From: [email protected] > To: [email protected] > Subject: RE: Multiple Reduces in a Single Crunch Job > Date: Wed, 26 Nov 2014 01:58:11 +0000 > > > I tried doing a Sample() instead of identity function, but that got fused > into the reduce as well and didn't work. > > First thing I tried was sticking a pipeline.run() in between there and I > was surprised but it didn't work either, same error. I'll rerun that config > now and try to get the dot files for the plan. > > Not sure if this is affecting it but in the same crunch application I have > a completely independent pipeline the runs before this one executes. I'll > turn that off as well and see if it's causing the issue. > > ------------------------------ > From: [email protected] > Date: Tue, 25 Nov 2014 17:43:52 -0800 > Subject: Re: Multiple Reduces in a Single Crunch Job > To: [email protected] > > Drat, I was hoping it was something simple. You could manually fix it by > injecting a pipeline.run() call between the secondarySort and the > groupByKey(), but of course, we'd like to handle this situation correctly > by default. > > J > > On Tue, Nov 25, 2014 at 5:39 PM, Danny Morgan <[email protected]> > wrote: > > I did a parallelDo with the IdentityFn of the output of the secondarySort > and the IdentityFn was just fused into the reduce phase of the > secondarySort and I got the same error message. > > I think you want me to somehow force a map phase in between the two > reduces? > > -Danny > > ------------------------------ > From: [email protected] > Date: Tue, 25 Nov 2014 17:23:29 -0800 > > Subject: Re: Multiple Reduces in a Single Crunch Job > To: [email protected] > > Oh, dumb question-- if you put like a dummy function between the > secondarySort and the groupByKey, like an IdentityFn or something, do > things work again? That would help w/diagnosing the problem. > > On Tue, Nov 25, 2014 at 5:15 PM, Josh Wills <[email protected]> wrote: > > So if you're getting it quickly, it might be b/c the job isn't recognizing > the dependency between the two separate phases of the job for some reason > (e.g., it's not realizing that one job has to be run before the other one.) > That's an odd situation, but we have had bugs like that in the past; let me > see if I can re-create the situation in an integration test. Which version > of Crunch? > > J > > On Tue, Nov 25, 2014 at 4:40 PM, Danny Morgan <[email protected]> > wrote: > > No that's definitely not it. I get this issue if I write to a single > output as well. > > If I remove the groupByKey().combineValues() line and just write out the > output from the SecondarySort it works. Seems to only complain about the > temp path not existing when I have multiple reduce phases in the pipeline. > Also the error seems to happen immediately during the setup or planning > phase, I assume this because the yarn jobs get created but they don't do > anything, and instead of FAILED the error message is "Application killed by > user." > > -Danny > > ------------------------------ > From: [email protected] > Date: Tue, 25 Nov 2014 16:30:58 -0800 > > Subject: Re: Multiple Reduces in a Single Crunch Job > To: [email protected] > > Ack, sorry-- it's this: https://issues.apache.org/jira/browse/CRUNCH-481 > <https://issues.apache.org/jira/browse/CRUNCH-481--> > > On Tue, Nov 25, 2014 at 4:24 PM, Danny Morgan <[email protected]> > wrote: > > Hello Again Josh, > > The link to the Jira issue you sent out seems to be cut off, could you > please resend it? > > I deleted the line where I write the collection to a text file, and > retried it but it didn't work either. Also tried writing the collection out > as Avro instead of Parquet, but got the same error. > > Here's the rest of the stracktrace: > > org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path > does not exist: hdfs:///tmp/crunch-2008950085/p1 > at > org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285) > at > org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:217) > at > org.apache.crunch.impl.mr.run.CrunchInputFormat.getSplits(CrunchInputFormat.java:65) > at > org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:491) > at > org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:508) > at > org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:392) > at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1268) > at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1265) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:415) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1528) > at org.apache.hadoop.mapreduce.Job.submit(Job.java:1265) > at > org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.submit(CrunchControlledJob.java:340) > at > org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.startReadyJobs(CrunchJobControl.java:277) > at > org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.pollJobStatusAndStartNewOnes(CrunchJobControl.java:316) > at > org.apache.crunch.impl.mr.exec.MRExecutor.monitorLoop(MRExecutor.java:113) > at > org.apache.crunch.impl.mr.exec.MRExecutor.access$000(MRExecutor.java:55) > at > org.apache.crunch.impl.mr.exec.MRExecutor$1.run(MRExecutor.java:84) > at java.lang.Thread.run(Thread.java:744) > > Thanks Josh! > > ------------------------------ > From: [email protected] > Date: Tue, 25 Nov 2014 16:10:33 -0800 > Subject: Re: Multiple Reduces in a Single Crunch Job > To: [email protected] > > > Hey Danny, > > I'm wondering if this is caused by > https://issues.apache.org/jira/browse/CRUNCH-481-- I think we use > different output committers for text files vs. parquet files, so at least > one of the outputs won't be written properly-- does that make sense? > > Josh > > On Tue, Nov 25, 2014 at 4:07 PM, Danny Morgan <[email protected]> > wrote: > > Hi Crunchers, > > I've attached a pdf of what my plan looks like. I've run into this problem > before where I have multiple reduce steps chained together in a single > pipeline and always get the same error. > > In the case of the attached pdf the error is > "org.apache.hadoop.mapreduce.lib.input.InvalidInputException: > Input path does not exist: hdfs:///tmp/crunch-1279941375/p1" > > That's the temp directory the crunch planner set up for the first reduce > phase. > > Can I run multiple chained reduces within the same pipeline? Do I have to > manually write out the output from the first reduce? > > Here's what the code looks like: > > // Simple mapper > PTable<String, Pair<Long, Log>> first = Danny.filterForDanny(logs); > // Secondary sort happens here > PTable<Danny, Long> second = Danny.extractDannys(first); > // Regular group by > PTable<Danny, Long> third = > second.groupByKey().combineValues(Aggregators.SUM_LONGS()); > // simple function that populates some fields in the Danny object > with the aggregate results > PCollection<Pair<Danny, String>> done = Danny.finalize(third); > Pair<PCollection<Danny>, PCollection<String>> splits = > Channels.split(done); > splits.second().write(To.textFile(mypath, WriteMode.OVERWRITE); > Target pq_danny = new AvroParquetFileTarget(pqPath)); > splits.first().write(pq_danny, WriteMode.OVERWRITE) > > Thanks! > > -Danny > > > > > -- > Director of Data Science > Cloudera <http://www.cloudera.com> > Twitter: @josh_wills <http://twitter.com/josh_wills> > > > > > -- > Director of Data Science > Cloudera <http://www.cloudera.com> > Twitter: @josh_wills <http://twitter.com/josh_wills> > > > > > -- > Director of Data Science > Cloudera <http://www.cloudera.com> > Twitter: @josh_wills <http://twitter.com/josh_wills> > > > > > > -- > Director of Data Science > Cloudera <http://www.cloudera.com> > Twitter: @josh_wills <http://twitter.com/josh_wills> > > > > > -- > Director of Data Science > Cloudera <http://www.cloudera.com> > Twitter: @josh_wills <http://twitter.com/josh_wills> > > > > > -- > Director of Data Science > Cloudera <http://www.cloudera.com> > Twitter: @josh_wills <http://twitter.com/josh_wills> > > > > > -- > Director of Data Science > Cloudera <http://www.cloudera.com> > Twitter: @josh_wills <http://twitter.com/josh_wills> >
