Hey Danny, I'm wondering if this is caused by https://issues.apache.org/jira/browse/CRUNCH-481-- I think we use different output committers for text files vs. parquet files, so at least one of the outputs won't be written properly-- does that make sense?
Josh On Tue, Nov 25, 2014 at 4:07 PM, Danny Morgan <[email protected]> wrote: > Hi Crunchers, > > I've attached a pdf of what my plan looks like. I've run into this problem > before where I have multiple reduce steps chained together in a single > pipeline and always get the same error. > > In the case of the attached pdf the error is > "org.apache.hadoop.mapreduce.lib.input.InvalidInputException: > Input path does not exist: hdfs:///tmp/crunch-1279941375/p1" > > That's the temp directory the crunch planner set up for the first reduce > phase. > > Can I run multiple chained reduces within the same pipeline? Do I have to > manually write out the output from the first reduce? > > Here's what the code looks like: > > // Simple mapper > PTable<String, Pair<Long, Log>> first = Danny.filterForDanny(logs); > // Secondary sort happens here > PTable<Danny, Long> second = Danny.extractDannys(first); > // Regular group by > PTable<Danny, Long> third = > second.groupByKey().combineValues(Aggregators.SUM_LONGS()); > // simple function that populates some fields in the Danny object > with the aggregate results > PCollection<Pair<Danny, String>> done = Danny.finalize(third); > Pair<PCollection<Danny>, PCollection<String>> splits = > Channels.split(done); > splits.second().write(To.textFile(mypath, WriteMode.OVERWRITE); > Target pq_danny = new AvroParquetFileTarget(pqPath)); > splits.first().write(pq_danny, WriteMode.OVERWRITE) > > Thanks! > > -Danny > -- Director of Data Science Cloudera <http://www.cloudera.com> Twitter: @josh_wills <http://twitter.com/josh_wills>
