Hello,
I am able to run a count of a PCollection from a bunch of avro files
just fine, but when I try to execute an MR job on the PCollection, I am
getting an NPE.
The following runs fine:
PCollection<Event> events = pipeline.read(From.avroFile("/raw/*.avro",
Avros.specifics(Event.class)));
PipelineResult result = pipeline.done();
System.out.println("Event count: " + events.getSize());
And I get the events count.
But the following doesn't (methods from a bunch of POJO from the avro
schema is used here):
PCollection<Event> events = pipeline.read(From.avroFile("/raw/*.avro",
Avros.specifics(Event.class)));
// Now create a PTable based on client and event type. Also
have a long for counting purpose.
PTable<Pair<String, String>, Long> eventsByClient =
events.parallelDo(
new MapFn<Event, Pair<Pair<String, String>, Long>>()
{
@Override
public Pair<Pair<String, String>, Long> map(Event event)
{
String eventType =
event.getBody().getTypeSpecificBody().getBody().getClass().getName();
eventType =
eventType.substring(eventType.lastIndexOf('.') + 1);
return
Pair.of(Pair.of(event.getHeader().getClientId(), eventType), 1L);
}
}, Avros.tableOf(Avros.pairs(Avros.strings(),
Avros.strings()), Avros.longs())
);
PTable<Pair<String, String>, Long> eventCountsByClient =
eventsByClient.groupByKey().combineValues(Aggregators.SUM_LONGS());
pipeline.writeTextFile(eventCountsByClient, "/user/samikr/output");
PipelineResult result = pipeline.done();
I am getting the following exception:
1 job failure(s) occurred:
Collect Data Info: Avro(/raw/... ID=1 (1/1)(1):
java.lang.NullPointerException
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:702)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:791)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:774)
at
org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:646)
at
org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:434)
at
org.apache.hadoop.fs.FilterFileSystem.mkdirs(FilterFileSystem.java:281)
at
org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:125)
at
org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:348)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
at
org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.submit(CrunchControlledJob.java:329)
at
org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.startReadyJobs(CrunchJobControl.java:204)
at
org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.pollJobStatusAndStartNewOnes(CrunchJobControl.java:238)
at
org.apache.crunch.impl.mr.exec.MRExecutor.monitorLoop(MRExecutor.java:112)
at
org.apache.crunch.impl.mr.exec.MRExecutor.access$000(MRExecutor.java:55)
at org.apache.crunch.impl.mr.exec.MRExecutor$1.run(MRExecutor.java:83)
at java.lang.Thread.run(Thread.java:745)
Not sure what is causing the NPE though. From the stack trace, it looks
like it is some permission issue. I have checked the "hadoop.tmp.dir"
and it seem to have write permission etc., and I have also noticed that
a folder named "samik.r1802905367" gets created for the job within that
directory. I have tried giving one specific avro file in pipeline.read
rather than *.avro, but that results in the same exception. Using hadoop
2.5.0, avro 1.7.7 and crunch 0.10.0-hadoop2 on the client side and CDH5
(2.3.0) on the server side.
Any pointers?
Regards.