On Sun, Feb 9, 2014 at 7:28 PM, Koduri,Vinay <[email protected]>wrote:
> Crunch, > > This is closely related to what Stephen has just posted[1]. > > In the attached DAG_PipelineWithoutMaterialization.pdf, I am trying to > avoid the double computation of the "MakingAPTable" function. Even with the > scale factor < 1, the planner is trying to compute that twice.(Refer to > PipelineWithoutMaterializationTest.java). So I am trying to > materializing the PTable<Writable,Writable> the function produces there by > avoiding the re run. I am doing > > pTable.materialize(); > pipeline.run(); > > That pTable's values could be null and I get an exception(attached) when > it is null during the materialization process. As discussed in [1], it > seems Writables.tableOf() also does not support null. When > PTable<Writable,Writable> is transformed to a > PCollection<Pair<Writable,Writable> > the materialization worked fine. (Refer to > PipelineThatMaterializesAPCollectionTest.java > and its DAG) > > Questions: > 1. Is there a better way to to avoid double computation of the function > without materialization? > Not right now, no. The better way would be to fuse both of the GBK operations into a single MapReduce job, but that is a planner optimization that requires someone to sit down and think really hard about for a couple of days in a place that has absolutely no Internet access or anything else even remotely interesting. > 2. Does Crunch convert PTable to a PCollection when emitting > intermediate outputs that are used by subsequent phases in a pipeline > execution? > That is a good question; I double-checked the code and verified that no, it does not do that conversion. > > > > [1] http://mail-archives.apache.org/mod_mbox/crunch-user/ > 201402.mbox/browser > > > *Stack Trace when materializing a PTable<Writable, Writable>:* > org.apache.crunch.CrunchRuntimeException: java.lang.NullPointerException > at org.apache.crunch.impl.mr.emit.MultipleOutputEmitter. > emit(MultipleOutputEmitter.java:45) > at org.apache.crunch.MapFn.process(MapFn.java:34) > at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99) > at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit( > IntermediateEmitter.java:56) > at com.cerner.pophealth.refrecord.load.CrunchSimpleTest$1.process( > CrunchSimpleTest.java:60) > at com.cerner.pophealth.refrecord.load.CrunchSimpleTest$1.process( > CrunchSimpleTest.java:1) > at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99) > at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit( > IntermediateEmitter.java:56) > at org.apache.crunch.MapFn.process(MapFn.java:34) > at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99) > at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110) > at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) > at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140) > at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:673) > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:331) > at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run( > LocalJobRunner.java:266) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > at java.util.concurrent.FutureTask.run(FutureTask.java:138) > at java.util.concurrent.ThreadPoolExecutor$Worker. > runTask(ThreadPoolExecutor.java:895) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:918) > at java.lang.Thread.run(Thread.java:695) > Caused by: java.lang.NullPointerException > at org.apache.hadoop.io.SequenceFile$Writer.append(SequenceFile.java:1268) > at org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat$1. > write(SequenceFileOutputFormat.java:74) > at org.apache.crunch.io.CrunchOutputs.write(CrunchOutputs.java:133) > at org.apache.crunch.impl.mr.emit.MultipleOutputEmitter. > emit(MultipleOutputEmitter.java:41) > ... 21 more > > > Thanks > CONFIDENTIALITY NOTICE This message and any included attachments are > from Cerner Corporation and are intended only for the addressee. The > information contained in this message is confidential and may constitute > inside or non-public information under international, federal, or state > securities laws. Unauthorized forwarding, printing, copying, distribution, > or use of such information is strictly prohibited and may be unlawful. If > you are not the addressee, please promptly delete this message and notify > the sender of the delivery error by e-mail or you may call Cerner's > corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024. > -- Director of Data Science Cloudera <http://www.cloudera.com> Twitter: @josh_wills <http://twitter.com/josh_wills>
