So I suspect the problem is enabling debug mode w/out having a log4j.properties setting specified-- we should probably have a threshold that causes an exception to be thrown (even in debug mode) to kill the job completely after a certain number of caught exceptions in the debug handler/logger.
On Fri, Aug 22, 2014 at 5:58 PM, Danny Morgan <[email protected]> wrote: > Awesome, you nailed it! > > In Avro > > Visit.id = int; > Log.parameter2 = string; > > I had this line in my DoFn: > > visit.setId(Integer.decode((String) log.getParam2())); > > Changing it to this fixed everything. > > visit.setId(Integer.decode(log.getParam2().toString())); > > I had turned debugging on for the pipeline, I didn't have a log4j > properties in the classpath and everything was just spitting out to the > console, I never saw an error or exception. Checking the MR History Server > logs didn't turn anything up either. > > ???? Any idea if I should have been seeing an exception? Is there some > magic crunch mode I can turn on to debug what was happening? > > Thanks Josh! > > Danny > > ------------------------------ > From: [email protected] > Date: Fri, 22 Aug 2014 15:37:39 -0700 > > Subject: Re: Multiple Writes in a single pipeline. > To: [email protected] > > Perversely fascinating. Does it matter if you do a mix of literals and > values from Log? Does any value from Log cause it to not be written out? > > I can't help but wonder if we're swallowing an exception somewhere-- > there's nothing in the logs, right? > > J > > > On Fri, Aug 22, 2014 at 12:08 PM, Danny Morgan <[email protected]> > wrote: > > Sorry deleted a line by accident but there should be a > > //v.setDatehour(l2.getDatehour); > > in "This doesn't work" block of code. > > ------------------------------ > From: [email protected] > To: [email protected] > Subject: RE: Multiple Writes in a single pipeline. > Date: Fri, 22 Aug 2014 19:03:37 +0000 > > > Had to do a lot of bisecting to finally get to where the issue seems to > be, going to take some time to get a separate junk of code that I can share > out but let me try to talk through the problem, maybe it's obvious. > > I have raw text data that gets parsed into Log avro classes, which then go > through another DoFn to get converted into Visit avros classes and I want > to write out both collections of Log and Visit records. Basically the issue > seems to happen in the DoFn that takes in Log and creates Visit. If I try > to set any of the fields in Visit using fields from Log then I end up > writing out no files, if I set the fields in Visit with literals then I > have output. > > Here's the code, I thought this might be a detached values issue and tried > that as well but it didn't seem to help. Originally the code just used "l" > directly in process() and I didn't bother with "l2". > > public static PTable<Visit, Pair<Long, Long>> parseVisit(PCollection<Log> > logs) { > return logs.parallelDo("second-parse", new DoFn<Log, > Pair<Visit,Pair<Long, Long>>>() { > private PType<Log> ptype; > @Override > public void initialize() { > this.ptype = Avros.specifics(Log.class); > this.ptype.initialize(getConfiguration()); > } > > @Override > > public void process(Log l, Emitter<Pair<Visit, Pair<Long,Long>>> > emitter) { > Log l2 = ptype.getDetachedValue(l); > > increment("test","visitcount"); > > // This works > Visit v = new > Visit(); > > v.setDate(1234); > v.setDatehour("123"); > v.setUser("123"); > emitter.emit(Pair.of(v, Pair.of(1L, 1L))); > > // This doesn't work > //Visit v = new Visit(); > //v.setDate(l2.getDate()); > //v.setUser(l2.getUser()); > //emitter.emit(Pair.of(v, Pair.of(1L, 1L))); > > > } > > }, Avros.tableOf(Avros.specifics(Visit.class), > Avros.pairs(Avros.longs(), Avros.longs()))); > } > > Thanks! > > > > > ------------------------------ > From: [email protected] > Date: Fri, 22 Aug 2014 08:57:13 -0700 > Subject: Re: Multiple Writes in a single pipeline. > To: [email protected] > > Many thanks for taking the time, would like to get this resolved before > the next release. > > J > > > On Fri, Aug 22, 2014 at 8:55 AM, Danny Morgan <[email protected]> > wrote: > > Hi Josh, > > The Log and Visits classes are all in the same jar, the classloader fix is > in place but I still get the issue without setting the class loader. > > I'll put together the smallest reproduction I can and send out the code. > > Danny > > ------------------------------ > From: [email protected] > Date: Fri, 22 Aug 2014 08:42:06 -0700 > > Subject: Re: Multiple Writes in a single pipeline. > To: [email protected] > > Hey Danny, > > I wrote the test that I inlined below and it ran successfully for me > against master and the 0.10 branch, so there must be something more subtle > going on here-- are the Log and Visit classes created in different jars? > I'm assuming the classloader fix is in play here and I'm wondering if there > is something weird there. > > import org.apache.crunch.MapFn; > import org.apache.crunch.PCollection; > import org.apache.crunch.Pipeline; > import org.apache.crunch.impl.mr.MRPipeline; > import org.apache.crunch.io.From; > import org.apache.crunch.io.To; > import org.apache.crunch.test.Employee; > import org.apache.crunch.test.Person; > import org.apache.crunch.test.TemporaryPath; > import org.apache.crunch.test.TemporaryPaths; > import org.apache.crunch.types.avro.Avros; > import org.apache.hadoop.fs.FileStatus; > import org.apache.hadoop.fs.FileSystem; > import org.apache.hadoop.fs.Path; > import org.junit.Rule; > import org.junit.Test; > > public class MultiAvroOutputIT { > > @Rule > public transient TemporaryPath tmpDir = TemporaryPaths.create(); > > @Test > public void testMultiAvroWrite() throws Exception { > Path person = tmpDir.getPath("person"); > Path employee = tmpDir.getPath("employee"); > Pipeline p = new MRPipeline(MultiAvroOutputIT.class, > tmpDir.getDefaultConfiguration()); > PCollection<String> shakes = > p.read(From.textFile(tmpDir.copyResourcePath("shakes.txt"))); > > shakes.parallelDo(new PersonFn(), Avros.specifics(Person.class)) > .write(To.avroFile(person)); > shakes.parallelDo(new EmployeeFn(), Avros.specifics(Employee.class)) > .write(To.avroFile(employee)); > p.run(); > > FileSystem fs = FileSystem.get(tmpDir.getDefaultConfiguration()); > System.out.println("Person"); > for (FileStatus fstat : fs.listStatus(person)) { > System.out.println(fstat.getPath() + ": " + fstat.getLen()); > } > System.out.println("Employee"); > for (FileStatus fstat : fs.listStatus(employee)) { > System.out.println(fstat.getPath() + ": " + fstat.getLen()); > } > > p.done(); > } > > static class PersonFn extends MapFn<String, Person> { > @Override > public Person map(String input) { > return new Person(); > } > } > > static class EmployeeFn extends MapFn<String, Employee> { > @Override > public Employee map(String input) { > return new Employee(); > } > } > > } > > > On Fri, Aug 22, 2014 at 8:12 AM, Josh Wills <[email protected]> wrote: > > That is super-interesting; let me try to replicate it in a test. > > J > > > On Fri, Aug 22, 2014 at 7:26 AM, Danny Morgan <[email protected]> > wrote: > > This issue looks similar to > https://issues.apache.org/jira/browse/CRUNCH-67 > > It turns out even if I get rid of the reduce phase and do just this: > > > PTable<String, String> lines = this.read(mySource); > PCollection<Log> parsed = lines.parallelDo("initial- > parsing", new myParser(), Avros.specifics(Log.class)); > > PTable<Visit, Pair<Long, Long>> visits = parsed.parallelDo("visits-parsing", > new VisitsExtractor(), > Avros.tableOf(Avros.specifics(Visit.class), > Avros.pairs(Avros.longs(), Avros.longs()))); > > visits.write(To.avroFile(outputPath+"/visits"), WriteMode.OVERWRITE); > parsed.write(To.avroFile(outputPath+"/raw"), WriteMode.OVERWRITE); > this.done(); > > The plan shows I should be writing to two different targets in a single > map phase however only "/raw" as data written out to it and "/visits" just > contains a _SUCCESS file and no data. > > Might this be an issue writing out to two different Avro types in the same > phase? > > Thanks Again, > > Danny > > > ------------------------------ > From: [email protected] > To: [email protected] > Subject: RE: Multiple Writes in a single pipeline. > Date: Fri, 22 Aug 2014 02:02:20 +0000 > > > Hi Josh, > > > ------------------------------ > From: [email protected] > Date: Thu, 21 Aug 2014 17:40:25 -0700 > Subject: Re: Multiple Writes in a single pipeline. > To: [email protected] > > The two different executions you have are doing different things, however. > In the first one, Crunch is running a single MapReduce job where the /raw > directory is written as a mapper side-output, and the /visits directory is > being written out on the reduce side (or at least, should be-- is there any > evidence of a failure in the job in the logs? Are bytes being written out > from the reducer?) > > No evidence of any failures in the logs, the single mapper and reducers > both succeed. The mapper definitely writes to HDFS the reducer does not, > here are the relevant counters from the reducer: > > FILE: Number of bytes read > <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/FILE_BYTES_READ> > 6 FILE: Number of bytes written > <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/FILE_BYTES_WRITTEN> > 91811 FILE: Number of large read operations > <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/FILE_LARGE_READ_OPS> > 0FILE: Number of read operations > <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/FILE_READ_OPS> > 0 FILE: Number of write operations > <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/FILE_WRITE_OPS> > 0 HDFS: Number of bytes read > <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/HDFS_BYTES_READ> > 6205 HDFS: Number of bytes written > <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/HDFS_BYTES_WRITTEN> > 0 HDFS: Number of large read operations > <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/HDFS_LARGE_READ_OPS> > 0 HDFS: Number of read operations > <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/HDFS_READ_OPS> > 4HDFS: Number of write operations > <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/HDFS_WRITE_OPS> > 2 > I couldn't find anything related on the crunch jira. > > For this problem, I think it would be more efficient to write the parsed > -> /raw output first, call run(), then do the agg -> /visits output > followed by done(), which would mean that you would only need to parse the > raw input once, instead of twice. > > Would the first option be more efficient if it worked? > > A helpful trick for seeing how the Crunch planner is mapping your logic > into MapReduce jobs is to look at the plan dot file via one of the > following mechanisms: > > 1) Instead of calling Pipeline.run(), call Pipeline.runAsync() and then > call the getPlanDotFile() method on the returned PipelineExecution object. > You can print the dot file to a file and use a dot file viewer to look at > how the DoFns are broken up into MR jobs and map/reduce phases. > 2) Call MRPipeline.plan() directly, which returns a MRExecutor object that > also implements PipelineExecution. (The difference being that calling > MRPipeline.plan will not start the jobs running, whereas calling runAsync > will.) > > I ran the two different version through dot and you're right they are two > complete different executions, pretty cool! > > Thanks! > > > > > -- > Director of Data Science > Cloudera <http://www.cloudera.com> > Twitter: @josh_wills <http://twitter.com/josh_wills> > > > > > -- > Director of Data Science > Cloudera <http://www.cloudera.com> > Twitter: @josh_wills <http://twitter.com/josh_wills> > > > > > > -- > Director of Data Science > Cloudera <http://www.cloudera.com> > Twitter: @josh_wills <http://twitter.com/josh_wills> > -- Director of Data Science Cloudera <http://www.cloudera.com> Twitter: @josh_wills <http://twitter.com/josh_wills>
