I have not tried running any MR or pig jobs from here so far. You think that
would be helpful?
Regards,
-Samik
On 12/09/2014 4:31 PM, Samik Raychaudhuri wrote:
Hi Gabriel,
Thanks for the response.
I am running with JDK 8u20 x64 (most recent), on Win 7 x64. Directly running
the program from IntelliJ (the editor I use for these stuff). Another
interesting part is that the HDFS is accessed through an SSH double tunnel,
but that shouldn't cause a problem - since I can run other hadoop commands
from a Cygwin command prompt just fine.
Here is the configuration part of the java program:
// Create a configuration object to load specific configurations.
Configuration conf = new Configuration();
conf.set("hadoop.socks.server", "localhost:8020");
conf.set("hadoop.rpc.socket.factory.class.default",
"org.apache.hadoop.net.SocksSocketFactory");
conf.set("fs.default.name", "hdfs://namenode01.xxx.net:8020");
// Use a local folder as the temporary folder.
conf.set("hadoop.tmp.dir", "/tmp");
Let me know if anything looks suspicious.
Also, thanks for clarifying that the first snippet returns the total size
(in bytes?), rather than the event count I was thinking I am getting.
Regards.
-Samik
On 12/09/2014 4:20 PM, Gabriel Reid wrote:
Hi Samik,
Thanks for your thorough description. Which operating system and JDK
are you running on the client?
By my read of the Hadoop & JDK code (although the line numbers don't
match up exactly), the issue is occurring when a shell command is
about to be run to set the permissions on a newly-created local
staging directory. It looks like one of the parameters to the shell
command (probably the path) is null, although I don't yet see how that
could happen.
The reason your first job is running properly (where you just call
getSize() on the returned PCollection) is that it's not actually doing
anything via MapReduce -- instead, it's just calculating the total
file size of all input files (PCollection#getSize() returns the byte
size, not the count of elements in the PCollection).
Are you able to run a "standard" mapreduce job or pig job or something
similar from that client?
- Gabriel
On Fri, Sep 12, 2014 at 11:27 AM, Samik Raychaudhuri <[email protected]>
wrote:
Hello,
I am able to run a count of a PCollection from a bunch of avro files just
fine, but when I try to execute an MR job on the PCollection, I am getting
an NPE.
The following runs fine:
PCollection<Event> events =
pipeline.read(From.avroFile("/raw/*.avro", Avros.specifics(Event.class)));
PipelineResult result = pipeline.done();
System.out.println("Event count: " + events.getSize());
And I get the events count.
But the following doesn't (methods from a bunch of POJO from the avro schema
is used here):
PCollection<Event> events =
pipeline.read(From.avroFile("/raw/*.avro", Avros.specifics(Event.class)));
// Now create a PTable based on client and event type. Also have a
long for counting purpose.
PTable<Pair<String, String>, Long> eventsByClient =
events.parallelDo(
new MapFn<Event, Pair<Pair<String, String>, Long>>()
{
@Override
public Pair<Pair<String, String>, Long> map(Event event)
{
String eventType =
event.getBody().getTypeSpecificBody().getBody().getClass().getName();
eventType =
eventType.substring(eventType.lastIndexOf('.') + 1);
return Pair.of(Pair.of(event.getHeader().getClientId(),
eventType), 1L);
}
}, Avros.tableOf(Avros.pairs(Avros.strings(), Avros.strings()),
Avros.longs())
);
PTable<Pair<String, String>, Long> eventCountsByClient =
eventsByClient.groupByKey().combineValues(Aggregators.SUM_LONGS());
pipeline.writeTextFile(eventCountsByClient, "/user/samikr/output");
PipelineResult result = pipeline.done();
I am getting the following exception:
1 job failure(s) occurred:
Collect Data Info: Avro(/raw/... ID=1 (1/1)(1):
java.lang.NullPointerException
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:702)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:791)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:774)
at
org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:646)
at
org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:434)
at
org.apache.hadoop.fs.FilterFileSystem.mkdirs(FilterFileSystem.java:281)
at
org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:125)
at
org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:348)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
at
org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.submit(CrunchControlledJob.java:329)
at
org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.startReadyJobs(CrunchJobControl.java:204)
at
org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.pollJobStatusAndStartNewOnes(CrunchJobControl.java:238)
at
org.apache.crunch.impl.mr.exec.MRExecutor.monitorLoop(MRExecutor.java:112)
at
org.apache.crunch.impl.mr.exec.MRExecutor.access$000(MRExecutor.java:55)
at org.apache.crunch.impl.mr.exec.MRExecutor$1.run(MRExecutor.java:83)
at java.lang.Thread.run(Thread.java:745)
Not sure what is causing the NPE though. From the stack trace, it looks like
it is some permission issue. I have checked the "hadoop.tmp.dir" and it seem
to have write permission etc., and I have also noticed that a folder named
"samik.r1802905367" gets created for the job within that directory. I have
tried giving one specific avro file in pipeline.read rather than *.avro, but
that results in the same exception. Using hadoop 2.5.0, avro 1.7.7 and
crunch 0.10.0-hadoop2 on the client side and CDH5 (2.3.0) on the server
side.
Any pointers?
Regards.