Hello Again Josh,
The link to the Jira issue you sent out seems to be cut off, could you please
resend it?
I deleted the line where I write the collection to a text file, and retried it
but it didn't work either. Also tried writing the collection out as Avro
instead of Parquet, but got the same error.
Here's the rest of the stracktrace:
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does
not exist: hdfs:///tmp/crunch-2008950085/p1 at
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)
at
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:217)
at
org.apache.crunch.impl.mr.run.CrunchInputFormat.getSplits(CrunchInputFormat.java:65)
at
org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:491)
at
org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:508)
at
org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:392)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1268) at
org.apache.hadoop.mapreduce.Job$10.run(Job.java:1265) at
java.security.AccessController.doPrivileged(Native Method) at
javax.security.auth.Subject.doAs(Subject.java:415) at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1528)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1265) at
org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.submit(CrunchControlledJob.java:340)
at
org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.startReadyJobs(CrunchJobControl.java:277)
at
org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.pollJobStatusAndStartNewOnes(CrunchJobControl.java:316)
at
org.apache.crunch.impl.mr.exec.MRExecutor.monitorLoop(MRExecutor.java:113)
at org.apache.crunch.impl.mr.exec.MRExecutor.access$000(MRExecutor.java:55)
at org.apache.crunch.impl.mr.exec.MRExecutor$1.run(MRExecutor.java:84)
at java.lang.Thread.run(Thread.java:744)
Thanks Josh!
From: [email protected]
Date: Tue, 25 Nov 2014 16:10:33 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: [email protected]
Hey Danny,
I'm wondering if this is caused by
https://issues.apache.org/jira/browse/CRUNCH-481-- I think we use different
output committers for text files vs. parquet files, so at least one of the
outputs won't be written properly-- does that make sense?
Josh
On Tue, Nov 25, 2014 at 4:07 PM, Danny Morgan <[email protected]> wrote:
Hi Crunchers,
I've attached a pdf of what my plan looks like. I've run into this problem
before where I have multiple reduce steps chained together in a single pipeline
and always get the same error.
In the case of the attached pdf the error is
"org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does
not exist: hdfs:///tmp/crunch-1279941375/p1"
That's the temp directory the crunch planner set up for the first reduce phase.
Can I run multiple chained reduces within the same pipeline? Do I have to
manually write out the output from the first reduce?
Here's what the code looks like:
// Simple mapper PTable<String, Pair<Long, Log>> first =
Danny.filterForDanny(logs); // Secondary sort happens here
PTable<Danny, Long> second = Danny.extractDannys(first); // Regular group
by PTable<Danny, Long> third =
second.groupByKey().combineValues(Aggregators.SUM_LONGS()); // simple
function that populates some fields in the Danny object with the aggregate
results PCollection<Pair<Danny, String>> done = Danny.finalize(third);
Pair<PCollection<Danny>, PCollection<String>> splits = Channels.split(done);
splits.second().write(To.textFile(mypath, WriteMode.OVERWRITE); Target
pq_danny = new AvroParquetFileTarget(pqPath));
splits.first().write(pq_danny, WriteMode.OVERWRITE)
Thanks!
-Danny
--
Director of Data ScienceClouderaTwitter: @josh_wills