No problem, Happy Thanksgiving!
Gobble Gobble...

From: [email protected]
Date: Tue, 25 Nov 2014 18:23:14 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: [email protected]

Very useful-- thank you. Will dig into it and report back, although I'm heading 
out for the holiday so it likely won't be until early next week.
J
On Tue, Nov 25, 2014 at 6:16 PM, Danny Morgan <[email protected]> wrote:






Having a single pipeline in the application didn't fix it. Sticking a 
pipeline.run() in the middle also didn't matter either, the plan appears such 
that the planner is completely ignoring the second the run() I added.
However what DOES WORK is if I do:
collection = 
secondarySort()pipeline.cache(collection)pipeline.run()newcollection = 
collection.groupByKey()
If I try adding the cache() without calling run() in between it doesn't work. 
Hope that's enough info for you to fix the possible planner bug.
Thanks for the help Josh!
From: [email protected]
To: [email protected]
Subject: RE: Multiple Reduces in a Single Crunch Job
Date: Wed, 26 Nov 2014 01:58:11 +0000




I tried doing a Sample() instead of identity function, but that got fused into 
the reduce as well and didn't work.
First thing I tried was sticking a pipeline.run() in between there and I was 
surprised but it didn't work either, same error. I'll rerun that config now and 
try to get the dot files for the plan.
Not sure if this is affecting it but in the same crunch application I have a 
completely independent pipeline the runs before this one executes. I'll turn 
that off as well and see if it's causing the issue.

From: [email protected]
Date: Tue, 25 Nov 2014 17:43:52 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: [email protected]

Drat, I was hoping it was something simple. You could manually fix it by 
injecting a pipeline.run() call between the secondarySort and the groupByKey(), 
but of course, we'd like to handle this situation correctly by default.
J
On Tue, Nov 25, 2014 at 5:39 PM, Danny Morgan <[email protected]> wrote:



I did a parallelDo with the IdentityFn of the output of the secondarySort and 
the IdentityFn was just fused into the reduce phase of the secondarySort and I 
got the same error message.
I think you want me to somehow force a map phase in between the two reduces?
-Danny

From: [email protected]
Date: Tue, 25 Nov 2014 17:23:29 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: [email protected]

Oh, dumb question-- if you put like a dummy function between the secondarySort 
and the groupByKey, like an IdentityFn or something, do things work again? That 
would help w/diagnosing the problem.
On Tue, Nov 25, 2014 at 5:15 PM, Josh Wills <[email protected]> wrote:
So if you're getting it quickly, it might be b/c the job isn't recognizing the 
dependency between the two separate phases of the job for some reason (e.g., 
it's not realizing that one job has to be run before the other one.) That's an 
odd situation, but we have had bugs like that in the past; let me see if I can 
re-create the situation in an integration test. Which version of Crunch?
J
On Tue, Nov 25, 2014 at 4:40 PM, Danny Morgan <[email protected]> wrote:



No that's definitely not it. I get this issue if I write to a single output as 
well.
If I remove the groupByKey().combineValues() line and just write out the output 
from the SecondarySort it works. Seems to only complain about the temp path not 
existing when I have multiple reduce phases in the pipeline. Also the error 
seems to happen immediately during the setup or planning phase, I assume this 
because the yarn jobs get created but they don't do anything, and instead of 
FAILED the error message is "Application killed by user."
-Danny

From: [email protected]
Date: Tue, 25 Nov 2014 16:30:58 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: [email protected]

Ack, sorry-- it's this: https://issues.apache.org/jira/browse/CRUNCH-481
On Tue, Nov 25, 2014 at 4:24 PM, Danny Morgan <[email protected]> wrote:



Hello Again Josh,
The link to the Jira issue you sent out seems to be cut off, could you please 
resend it?
I deleted the line where I write the collection to a text file, and retried it 
but it didn't work either. Also tried writing the collection out as Avro 
instead of Parquet, but got the same error.
Here's the rest of the stracktrace:
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does 
not exist: hdfs:///tmp/crunch-2008950085/p1        at 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)
        at 
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:217)
        at 
org.apache.crunch.impl.mr.run.CrunchInputFormat.getSplits(CrunchInputFormat.java:65)
        at 
org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:491)  
      at 
org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:508)     
   at 
org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:392)
        at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1268)        at 
org.apache.hadoop.mapreduce.Job$10.run(Job.java:1265)        at 
java.security.AccessController.doPrivileged(Native Method)        at 
javax.security.auth.Subject.doAs(Subject.java:415)        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1528)
        at org.apache.hadoop.mapreduce.Job.submit(Job.java:1265)        at 
org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.submit(CrunchControlledJob.java:340)
        at 
org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.startReadyJobs(CrunchJobControl.java:277)
        at 
org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.pollJobStatusAndStartNewOnes(CrunchJobControl.java:316)
        at 
org.apache.crunch.impl.mr.exec.MRExecutor.monitorLoop(MRExecutor.java:113)      
  at org.apache.crunch.impl.mr.exec.MRExecutor.access$000(MRExecutor.java:55)   
     at org.apache.crunch.impl.mr.exec.MRExecutor$1.run(MRExecutor.java:84)     
   at java.lang.Thread.run(Thread.java:744)
Thanks Josh!
From: [email protected]
Date: Tue, 25 Nov 2014 16:10:33 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: [email protected]

Hey Danny,
I'm wondering if this is caused by 
https://issues.apache.org/jira/browse/CRUNCH-481-- I think we use different 
output committers for text files vs. parquet files, so at least one of the 
outputs won't be written properly-- does that make sense?
Josh
On Tue, Nov 25, 2014 at 4:07 PM, Danny Morgan <[email protected]> wrote:



Hi Crunchers,
I've attached a pdf of what my plan looks like. I've run into this problem 
before where I have multiple reduce steps chained together in a single pipeline 
and always get the same error.
In the case of the attached pdf the error is 
"org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does 
not exist: hdfs:///tmp/crunch-1279941375/p1"
That's the temp directory the crunch planner set up for the first reduce phase.
Can I run multiple chained reduces within the same pipeline? Do I have to 
manually write out the output from the first reduce?
Here's what the code looks like:
      // Simple mapper      PTable<String, Pair<Long, Log>> first = 
Danny.filterForDanny(logs);      // Secondary sort happens here      
PTable<Danny, Long> second = Danny.extractDannys(first);      // Regular group 
by      PTable<Danny, Long> third = 
second.groupByKey().combineValues(Aggregators.SUM_LONGS());      // simple 
function that populates some fields in the Danny object with the aggregate 
results      PCollection<Pair<Danny, String>> done = Danny.finalize(third);     
 Pair<PCollection<Danny>, PCollection<String>> splits = Channels.split(done);   
   splits.second().write(To.textFile(mypath, WriteMode.OVERWRITE);      Target 
pq_danny = new AvroParquetFileTarget(pqPath));      
splits.first().write(pq_danny, WriteMode.OVERWRITE)
Thanks!
-Danny                                    


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
                                          


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
                                          


-- 
Director of Data ScienceClouderaTwitter: @josh_wills


                                          


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
                                          
                                          


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
                                          

Reply via email to