That is super-interesting; let me try to replicate it in a test. J
On Fri, Aug 22, 2014 at 7:26 AM, Danny Morgan <[email protected]> wrote: > This issue looks similar to > https://issues.apache.org/jira/browse/CRUNCH-67 > > It turns out even if I get rid of the reduce phase and do just this: > > > PTable<String, String> lines = this.read(mySource); > PCollection<Log> parsed = lines.parallelDo("initial- > parsing", new myParser(), Avros.specifics(Log.class)); > > PTable<Visit, Pair<Long, Long>> visits = parsed.parallelDo("visits-parsing", > new VisitsExtractor(), > Avros.tableOf(Avros.specifics(Visit.class), > Avros.pairs(Avros.longs(), Avros.longs()))); > > visits.write(To.avroFile(outputPath+"/visits"), WriteMode.OVERWRITE); > parsed.write(To.avroFile(outputPath+"/raw"), WriteMode.OVERWRITE); > this.done(); > > The plan shows I should be writing to two different targets in a single > map phase however only "/raw" as data written out to it and "/visits" just > contains a _SUCCESS file and no data. > > Might this be an issue writing out to two different Avro types in the same > phase? > > Thanks Again, > > Danny > > > ------------------------------ > From: [email protected] > To: [email protected] > Subject: RE: Multiple Writes in a single pipeline. > Date: Fri, 22 Aug 2014 02:02:20 +0000 > > > Hi Josh, > > > ------------------------------ > From: [email protected] > Date: Thu, 21 Aug 2014 17:40:25 -0700 > Subject: Re: Multiple Writes in a single pipeline. > To: [email protected] > > The two different executions you have are doing different things, however. > In the first one, Crunch is running a single MapReduce job where the /raw > directory is written as a mapper side-output, and the /visits directory is > being written out on the reduce side (or at least, should be-- is there any > evidence of a failure in the job in the logs? Are bytes being written out > from the reducer?) > > No evidence of any failures in the logs, the single mapper and reducers > both succeed. The mapper definitely writes to HDFS the reducer does not, > here are the relevant counters from the reducer: > > FILE: Number of bytes read > <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/FILE_BYTES_READ> > 6 FILE: Number of bytes written > <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/FILE_BYTES_WRITTEN> > 91811 FILE: Number of large read operations > <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/FILE_LARGE_READ_OPS> > 0FILE: Number of read operations > <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/FILE_READ_OPS> > 0 FILE: Number of write operations > <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/FILE_WRITE_OPS> > 0HDFS: Number of bytes read > <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/HDFS_BYTES_READ> > 6205 HDFS: Number of bytes written > <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/HDFS_BYTES_WRITTEN> > 0 HDFS: Number of large read operations > <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/HDFS_LARGE_READ_OPS> > 0 HDFS: Number of read operations > <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/HDFS_READ_OPS> > 4HDFS: Number of write operations > <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/HDFS_WRITE_OPS> > 2 > I couldn't find anything related on the crunch jira. > > For this problem, I think it would be more efficient to write the parsed > -> /raw output first, call run(), then do the agg -> /visits output > followed by done(), which would mean that you would only need to parse the > raw input once, instead of twice. > > Would the first option be more efficient if it worked? > > A helpful trick for seeing how the Crunch planner is mapping your logic > into MapReduce jobs is to look at the plan dot file via one of the > following mechanisms: > > 1) Instead of calling Pipeline.run(), call Pipeline.runAsync() and then > call the getPlanDotFile() method on the returned PipelineExecution object. > You can print the dot file to a file and use a dot file viewer to look at > how the DoFns are broken up into MR jobs and map/reduce phases. > 2) Call MRPipeline.plan() directly, which returns a MRExecutor object that > also implements PipelineExecution. (The difference being that calling > MRPipeline.plan will not start the jobs running, whereas calling runAsync > will.) > > I ran the two different version through dot and you're right they are two > complete different executions, pretty cool! > > Thanks! > > -- Director of Data Science Cloudera <http://www.cloudera.com> Twitter: @josh_wills <http://twitter.com/josh_wills>
