Hi Crunchers,
I've attached a pdf of what my plan looks like. I've run into this problem
before where I have multiple reduce steps chained together in a single pipeline
and always get the same error.
In the case of the attached pdf the error is
"org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does
not exist: hdfs:///tmp/crunch-1279941375/p1"
That's the temp directory the crunch planner set up for the first reduce phase.
Can I run multiple chained reduces within the same pipeline? Do I have to
manually write out the output from the first reduce?
Here's what the code looks like:
// Simple mapper PTable<String, Pair<Long, Log>> first =
Danny.filterForDanny(logs); // Secondary sort happens here
PTable<Danny, Long> second = Danny.extractDannys(first); // Regular group
by PTable<Danny, Long> third =
second.groupByKey().combineValues(Aggregators.SUM_LONGS()); // simple
function that populates some fields in the Danny object with the aggregate
results PCollection<Pair<Danny, String>> done = Danny.finalize(third);
Pair<PCollection<Danny>, PCollection<String>> splits = Channels.split(done);
splits.second().write(To.textFile(mypath, WriteMode.OVERWRITE); Target
pq_danny = new AvroParquetFileTarget(pqPath));
splits.first().write(pq_danny, WriteMode.OVERWRITE)
Thanks!
-Danny
second_phase.pdf
Description: Adobe PDF document
