Hey Danny,
I wrote the test that I inlined below and it ran successfully for me
against master and the 0.10 branch, so there must be something more subtle
going on here-- are the Log and Visit classes created in different jars?
I'm assuming the classloader fix is in play here and I'm wondering if there
is something weird there.
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.From;
import org.apache.crunch.io.To;
import org.apache.crunch.test.Employee;
import org.apache.crunch.test.Person;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Rule;
import org.junit.Test;
public class MultiAvroOutputIT {
@Rule
public transient TemporaryPath tmpDir = TemporaryPaths.create();
@Test
public void testMultiAvroWrite() throws Exception {
Path person = tmpDir.getPath("person");
Path employee = tmpDir.getPath("employee");
Pipeline p = new MRPipeline(MultiAvroOutputIT.class,
tmpDir.getDefaultConfiguration());
PCollection<String> shakes =
p.read(From.textFile(tmpDir.copyResourcePath("shakes.txt")));
shakes.parallelDo(new PersonFn(), Avros.specifics(Person.class))
.write(To.avroFile(person));
shakes.parallelDo(new EmployeeFn(), Avros.specifics(Employee.class))
.write(To.avroFile(employee));
p.run();
FileSystem fs = FileSystem.get(tmpDir.getDefaultConfiguration());
System.out.println("Person");
for (FileStatus fstat : fs.listStatus(person)) {
System.out.println(fstat.getPath() + ": " + fstat.getLen());
}
System.out.println("Employee");
for (FileStatus fstat : fs.listStatus(employee)) {
System.out.println(fstat.getPath() + ": " + fstat.getLen());
}
p.done();
}
static class PersonFn extends MapFn<String, Person> {
@Override
public Person map(String input) {
return new Person();
}
}
static class EmployeeFn extends MapFn<String, Employee> {
@Override
public Employee map(String input) {
return new Employee();
}
}
}
On Fri, Aug 22, 2014 at 8:12 AM, Josh Wills <[email protected]> wrote:
> That is super-interesting; let me try to replicate it in a test.
>
> J
>
>
> On Fri, Aug 22, 2014 at 7:26 AM, Danny Morgan <[email protected]>
> wrote:
>
>> This issue looks similar to
>> https://issues.apache.org/jira/browse/CRUNCH-67
>>
>> It turns out even if I get rid of the reduce phase and do just this:
>>
>>
>> PTable<String, String> lines = this.read(mySource);
>> PCollection<Log> parsed = lines.parallelDo("initial-
>> parsing", new myParser(), Avros.specifics(Log.class));
>>
>> PTable<Visit, Pair<Long, Long>> visits = parsed.parallelDo("visits-parsing",
>> new VisitsExtractor(),
>> Avros.tableOf(Avros.specifics(Visit.class),
>> Avros.pairs(Avros.longs(), Avros.longs())));
>>
>> visits.write(To.avroFile(outputPath+"/visits"), WriteMode.OVERWRITE);
>> parsed.write(To.avroFile(outputPath+"/raw"), WriteMode.OVERWRITE);
>> this.done();
>>
>> The plan shows I should be writing to two different targets in a single
>> map phase however only "/raw" as data written out to it and "/visits" just
>> contains a _SUCCESS file and no data.
>>
>> Might this be an issue writing out to two different Avro types in the
>> same phase?
>>
>> Thanks Again,
>>
>> Danny
>>
>>
>> ------------------------------
>> From: [email protected]
>> To: [email protected]
>> Subject: RE: Multiple Writes in a single pipeline.
>> Date: Fri, 22 Aug 2014 02:02:20 +0000
>>
>>
>> Hi Josh,
>>
>>
>> ------------------------------
>> From: [email protected]
>> Date: Thu, 21 Aug 2014 17:40:25 -0700
>> Subject: Re: Multiple Writes in a single pipeline.
>> To: [email protected]
>>
>> The two different executions you have are doing different things,
>> however. In the first one, Crunch is running a single MapReduce job where
>> the /raw directory is written as a mapper side-output, and the /visits
>> directory is being written out on the reduce side (or at least, should be--
>> is there any evidence of a failure in the job in the logs? Are bytes being
>> written out from the reducer?)
>>
>> No evidence of any failures in the logs, the single mapper and reducers
>> both succeed. The mapper definitely writes to HDFS the reducer does not,
>> here are the relevant counters from the reducer:
>>
>> FILE: Number of bytes read
>> <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/FILE_BYTES_READ>
>> 6 FILE: Number of bytes written
>> <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/FILE_BYTES_WRITTEN>
>> 91811 FILE: Number of large read operations
>> <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/FILE_LARGE_READ_OPS>
>> 0FILE: Number of read operations
>> <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/FILE_READ_OPS>
>> 0 FILE: Number of write operations
>> <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/FILE_WRITE_OPS>
>> 0 HDFS: Number of bytes read
>> <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/HDFS_BYTES_READ>
>> 6205 HDFS: Number of bytes written
>> <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/HDFS_BYTES_WRITTEN>
>> 0 HDFS: Number of large read operations
>> <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/HDFS_LARGE_READ_OPS>
>> 0 HDFS: Number of read operations
>> <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/HDFS_READ_OPS>
>> 4HDFS: Number of write operations
>> <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/HDFS_WRITE_OPS>
>> 2
>> I couldn't find anything related on the crunch jira.
>>
>> For this problem, I think it would be more efficient to write the parsed
>> -> /raw output first, call run(), then do the agg -> /visits output
>> followed by done(), which would mean that you would only need to parse the
>> raw input once, instead of twice.
>>
>> Would the first option be more efficient if it worked?
>>
>> A helpful trick for seeing how the Crunch planner is mapping your logic
>> into MapReduce jobs is to look at the plan dot file via one of the
>> following mechanisms:
>>
>> 1) Instead of calling Pipeline.run(), call Pipeline.runAsync() and then
>> call the getPlanDotFile() method on the returned PipelineExecution object.
>> You can print the dot file to a file and use a dot file viewer to look at
>> how the DoFns are broken up into MR jobs and map/reduce phases.
>> 2) Call MRPipeline.plan() directly, which returns a MRExecutor object
>> that also implements PipelineExecution. (The difference being that calling
>> MRPipeline.plan will not start the jobs running, whereas calling runAsync
>> will.)
>>
>> I ran the two different version through dot and you're right they are two
>> complete different executions, pretty cool!
>>
>> Thanks!
>>
>>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
--
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>