Harsh and Jay, all the limits in my system are:
yanglin@ubuntu:~$ ulimit -a core file size (blocks, -c) 0 data seg size (kbytes, -d) unlimited scheduling priority (-e) 0 file size (blocks, -f) unlimited pending signals (-i) 32059 max locked memory (kbytes, -l) 64 max memory size (kbytes, -m) unlimited open files (-n) 1024 pipe size (512 bytes, -p) 8 POSIX message queues (bytes, -q) 819200 real-time priority (-r) 0 stack size (kbytes, -s) 8192 cpu time (seconds, -t) unlimited max user processes (-u) 32059 virtual memory (kbytes, -v) unlimited file locks (-x) unlimited --- Is there any way I could check which limitation has been reached when running the MapReduce? BTW, I have a stupid mapper which would read HDFS files in the map() function, but I do call the FSDataStream.close() to close file at the end of map() function, does it matter to this problem? 2012/9/18 Harsh J <[email protected]> > Jason, > > Perhaps then go with Jay's lead here: ulimits (over nproc and nofile > mostly). Can you check if they are adequately high for opening several > blocks+sockets, for the user that runs the DataNode and for the user > that runs the TaskTracker (if insecure mode)? > > On Tue, Sep 18, 2012 at 9:14 AM, Jason Yang <[email protected]> > wrote: > > Hey, Harsh > > > > Thanks for your reply. > > > > There are 20 data files as input and each of them would be clustered > into 4 > > groups. And as I used the "DataFileName-groupNum" as output key, so it > would > > be 80 unique keys in total. > > > > According to your suggestion, I have done two following tests: > > > > 1) Try a smaller input: I choose 5 files randomly as input, it always > works > > fine! > > 2) Run it on fully-distributed cluster: it always works fine with 20 data > > files input in fully-distributed cluster, while It always fails on > > pseudo-distributed cluster. > > > > So, it seems to be related to the xceiver/load limits you mentioned, and > I > > have changed the xceiver value in the hdfs-site.xml: > > > > > > <property> > > <name>dfs.datanode.max.xcievers</name> > > <value>4096</value> > > </property> > > > > > > but I still got the same error when running with 20 data files inputs in > > pseudo-distributed clusters. > > > > How could I fix this problem? > > > > 2012/9/18 Harsh J <[email protected]> > >> > >> Hi Jason, > >> > >> How many unique keys are you going to be generating from this program, > >> roughly? > >> > >> By default, the max-load of a DN is about 4k threads and if you're > >> trying to push beyond that value then the NN will no longer select the > >> DN as it would consider it already overloaded. In a fully distributed > >> mode, you may not see this issue as there's several DNs and TTs to > >> distribute the write load across. > >> > >> Try with a smaller input sample if there's a whole lot of keys you'll > >> be creating files for, and see if that works instead (such that > >> there's fewer files and you do not hit the xceiver/load limits). > >> > >> On Mon, Sep 17, 2012 at 7:20 PM, Jason Yang <[email protected]> > >> wrote: > >> > Hi, all > >> > > >> > I have written a simple MR program which partition a file into > multiple > >> > files bases on the clustering result of the points in this file, here > is > >> > my > >> > code: > >> > --- > >> > private int run() throws IOException > >> > { > >> > String scheme = getConf().get(CommonUtility.ATTR_SCHEME); > >> > String ecgDir = getConf().get(CommonUtility.ATTR_ECG_DATA_DIR); > >> > String outputDir = getConf().get(CommonUtility.ATTR_OUTPUT_DIR); > >> > > >> > // create JobConf > >> > JobConf jobConf = new JobConf(getConf(), this.getClass()); > >> > > >> > // set path for input and output > >> > Path inPath = new Path(scheme + ecgDir); > >> > Path outPath = new Path(scheme + outputDir + > >> > CommonUtility.OUTPUT_LOCAL_CLUSTERING); > >> > FileInputFormat.addInputPath(jobConf, inPath); > >> > FileOutputFormat.setOutputPath(jobConf, outPath); > >> > > >> > // clear output if it already existed > >> > CommonUtility.deleteHDFSFile(outPath.toString()); > >> > > >> > // set format for input and output > >> > jobConf.setInputFormat(WholeFileInputFormat.class); > >> > jobConf.setOutputFormat(LocalClusterMSFOutputFormat.class); > >> > > >> > // set class of output key and value > >> > jobConf.setOutputKeyClass(Text.class); > >> > jobConf.setOutputValueClass(RRIntervalWritable.class); > >> > > >> > // set mapper and reducer > >> > jobConf.setMapperClass(LocalClusteringMapper.class); > >> > jobConf.setReducerClass(IdentityReducer.class); > >> > > >> > > >> > // run the job > >> > JobClient.runJob(jobConf); > >> > return 0; > >> > } > >> > > >> > ... > >> > > >> > public class LocalClusteringMapper extends MapReduceBase implements > >> > Mapper<NullWritable, BytesWritable, Text, RRIntervalWritable> > >> > { > >> > @Override > >> > public void map(NullWritable key, BytesWritable value, > >> > OutputCollector<Text, RRIntervalWritable> output, Reporter reporter) > >> > throws IOException > >> > { > >> > //read and cluster > >> > ... > >> > > >> > // output > >> > Iterator<RRIntervalWritable> it = rrArray.iterator(); > >> > while (it.hasNext()) > >> > { > >> > RRIntervalWritable rr = it.next(); > >> > > >> > Text outputKey = new Text(rr.clusterResult ); > >> > > >> > output.collect(outputKey, rr); > >> > } > >> > > >> > } > >> > > >> > ... > >> > > >> > public class LocalClusterMSFOutputFormat extends > >> > MultipleSequenceFileOutputFormat<Text, RRIntervalWritable> > >> > { > >> > > >> > protected String generateFileNameForKeyValue(Text key, > >> > RRIntervalWritable value, String name) > >> > { > >> > return value.clusterResult.toString(); > >> > } > >> > } > >> > --- > >> > > >> > But this program always get a IO Exception when running in a > >> > pseudo-distributed cluster, and the log has been attached at the end > of > >> > this > >> > post. > >> > > >> > There's something wired: > >> > 1. If I use the SequenceFileOutputFormat instead of > >> > MultipleSequenceFileOutputFormat, this program would works fine( at > >> > least > >> > there is no error in log). > >> > 2. The one which always cause the error is the EcgData002509_LCF_3 > >> > > >> > > >> >> > >> >> 12/09/17 21:10:35 INFO mapred.MapTask: Starting flush of map output > >> >> 12/09/17 21:10:35 INFO mapred.MapTask: Finished spill 0 > >> >> 12/09/17 21:10:35 INFO mapred.TaskRunner: > >> >> Task:attempt_local_0001_m_000019_0 is done. And is in the process of > >> >> commiting > >> >> 12/09/17 21:10:35 INFO mapred.LocalJobRunner: > >> >> 12/09/17 21:10:35 INFO mapred.TaskRunner: Task > >> >> 'attempt_local_0001_m_000019_0' done. > >> >> 12/09/17 21:10:35 INFO mapred.LocalJobRunner: > >> >> 12/09/17 21:10:35 INFO mapred.Merger: Merging 20 sorted segments > >> >> 12/09/17 21:10:35 INFO mapred.Merger: Merging 2 intermediate segments > >> >> out > >> >> of a total of 20 > >> >> 12/09/17 21:10:35 INFO mapred.Merger: Merging 10 intermediate > segments > >> >> out > >> >> of a total of 19 > >> >> 12/09/17 21:10:35 INFO mapred.Merger: Down to the last merge-pass, > with > >> >> 10 > >> >> segments left of total size: 18913891 bytes > >> >> 12/09/17 21:10:35 INFO mapred.LocalJobRunner: > >> >> 12/09/17 21:10:39 WARN hdfs.DFSClient: DataStreamer Exception: > >> >> org.apache.hadoop.ipc.RemoteException: java.io.IOException: File > >> >> > >> >> > /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3 > >> >> could only be replicated to 0 nodes, instead of 1 > >> >> at > >> >> > >> >> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1271) > >> >> at > >> >> > >> >> > org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:422) > >> >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) > >> >> at > >> >> > >> >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > >> >> at java.lang.reflect.Method.invoke(Method.java:597) > >> >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508) > >> >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959) > >> >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955) > >> >> at java.security.AccessController.doPrivileged(Native Method) > >> >> at javax.security.auth.Subject.doAs(Subject.java:396) > >> >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953) > >> >> at org.apache.hadoop.ipc.Client.call(Client.java:740) > >> >> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220) > >> >> at $Proxy0.addBlock(Unknown Source) > >> >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) > >> >> at > >> >> > >> >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > >> >> at java.lang.reflect.Method.invoke(Method.java:597) > >> >> at > >> >> > >> >> > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) > >> >> at > >> >> > >> >> > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) > >> >> at $Proxy0.addBlock(Unknown Source) > >> >> at > >> >> > >> >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.locateFollowingBlock(DFSClient.java:2937) > >> >> at > >> >> > >> >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2819) > >> >> at > >> >> > >> >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:2102) > >> >> at > >> >> > >> >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2288) > >> >> 12/09/17 21:10:39 WARN hdfs.DFSClient: Error Recovery for block null > >> >> bad > >> >> datanode[0] nodes == null > >> >> 12/09/17 21:10:39 WARN hdfs.DFSClient: Could not get block locations. > >> >> Source file > >> >> > >> >> > "/work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3" > >> >> - Aborting... > >> >> 12/09/17 21:10:41 INFO mapred.LocalJobRunner: reduce > reduce > >> >> 12/09/17 21:10:42 INFO mapred.JobClient: map 100% reduce 89% > >> >> 12/09/17 21:10:42 WARN mapred.LocalJobRunner: job_local_0001 > >> >> org.apache.hadoop.ipc.RemoteException: java.io.IOException: File > >> >> > >> >> > /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3 > >> >> could only be replicated to 0 nodes, instead of 1 > >> >> at > >> >> > >> >> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1271) > >> >> at > >> >> > >> >> > org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:422) > >> >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) > >> >> at > >> >> > >> >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > >> >> at java.lang.reflect.Method.invoke(Method.java:597) > >> >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508) > >> >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959) > >> >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955) > >> >> at java.security.AccessController.doPrivileged(Native Method) > >> >> at javax.security.auth.Subject.doAs(Subject.java:396) > >> >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953) > >> >> at org.apache.hadoop.ipc.Client.call(Client.java:740) > >> >> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220) > >> >> at $Proxy0.addBlock(Unknown Source) > >> >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) > >> >> at > >> >> > >> >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > >> >> at java.lang.reflect.Method.invoke(Method.java:597) > >> >> at > >> >> > >> >> > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) > >> >> at > >> >> > >> >> > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) > >> >> at $Proxy0.addBlock(Unknown Source) > >> >> at > >> >> > >> >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.locateFollowingBlock(DFSClient.java:2937) > >> >> at > >> >> > >> >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2819) > >> >> at > >> >> > >> >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:2102) > >> >> at > >> >> > >> >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2288) > >> >> 12/09/17 21:10:43 INFO mapred.JobClient: Job complete: job_local_0001 > >> >> 12/09/17 21:10:43 INFO mapred.JobClient: Counters: 15 > >> >> 12/09/17 21:10:43 INFO mapred.JobClient: FileSystemCounters > >> >> 12/09/17 21:10:43 INFO mapred.JobClient: FILE_BYTES_READ= > 23297226 > >> >> 12/09/17 21:10:43 INFO mapred.JobClient: > HDFS_BYTES_READ=546711709 > >> >> 12/09/17 21:10:43 INFO mapred.JobClient: > >> >> FILE_BYTES_WRITTEN=232075142 > >> >> 12/09/17 21:10:43 INFO mapred.JobClient: > >> >> HDFS_BYTES_WRITTEN=13530993 > >> >> 12/09/17 21:10:43 INFO mapred.JobClient: Map-Reduce Framework > >> >> 12/09/17 21:10:43 INFO mapred.JobClient: Reduce input groups=56 > >> >> 12/09/17 21:10:43 INFO mapred.JobClient: Combine output records=0 > >> >> 12/09/17 21:10:43 INFO mapred.JobClient: Map input records=20 > >> >> 12/09/17 21:10:43 INFO mapred.JobClient: Reduce shuffle bytes=0 > >> >> 12/09/17 21:10:43 INFO mapred.JobClient: Reduce output > >> >> records=38837 > >> >> 12/09/17 21:10:43 INFO mapred.JobClient: Spilled Records=102562 > >> >> 12/09/17 21:10:43 INFO mapred.JobClient: Map output > bytes=18691072 > >> >> 12/09/17 21:10:43 INFO mapred.JobClient: Map input bytes= > 28649088 > >> >> 12/09/17 21:10:43 INFO mapred.JobClient: Combine input records=0 > >> >> 12/09/17 21:10:43 INFO mapred.JobClient: Map output records=55700 > >> >> 12/09/17 21:10:43 INFO mapred.JobClient: Reduce input > records=38838 > >> >> 12/09/17 21:10:44 INFO mapred.LocalJobRunner: reduce > reduce > >> >> Exception in thread "main" java.io.IOException: Job failed! > >> >> at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1252) > >> >> at > org.yanglin.mr.lab.ecg.PESCJob.runLocalClustering(PESCJob.java:111) > >> >> at org.yanglin.mr.lab.ecg.PESCJob.run(PESCJob.java:57) > >> >> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65) > >> >> at org.yanglin.mr.lab.ecg.PESCJob.main(PESCJob.java:117) > >> >> 12/09/17 21:10:48 ERROR hdfs.DFSClient: Exception closing file > >> >> > >> >> > /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3 > >> >> : org.apache.hadoop.ipc.RemoteException: java.io.IOException: File > >> >> > >> >> > /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3 > >> >> could only be replicated to 0 nodes, instead of 1 > >> >> at > >> >> > >> >> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1271) > >> >> at > >> >> > >> >> > org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:422) > >> >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) > >> >> at > >> >> > >> >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > >> >> at java.lang.reflect.Method.invoke(Method.java:597) > >> >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508) > >> >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959) > >> >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955) > >> >> at java.security.AccessController.doPrivileged(Native Method) > >> >> at javax.security.auth.Subject.doAs(Subject.java:396) > >> >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953) > >> >> org.apache.hadoop.ipc.RemoteException: java.io.IOException: File > >> >> > >> >> > /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3 > >> >> could only be replicated to 0 nodes, instead of 1 > >> >> at > >> >> > >> >> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1271) > >> >> at > >> >> > >> >> > org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:422) > >> >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) > >> >> at > >> >> > >> >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > >> >> at java.lang.reflect.Method.invoke(Method.java:597) > >> >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508) > >> >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959) > >> >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955) > >> >> at java.security.AccessController.doPrivileged(Native Method) > >> >> at javax.security.auth.Subject.doAs(Subject.java:396) > >> >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953) > >> >> at org.apache.hadoop.ipc.Client.call(Client.java:740) > >> >> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220) > >> >> at $Proxy0.addBlock(Unknown Source) > >> >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) > >> >> at > >> >> > >> >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > >> >> at java.lang.reflect.Method.invoke(Method.java:597) > >> >> at > >> >> > >> >> > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) > >> >> at > >> >> > >> >> > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) > >> >> at $Proxy0.addBlock(Unknown Source) > >> >> at > >> >> > >> >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.locateFollowingBlock(DFSClient.java:2937) > >> >> at > >> >> > >> >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2819) > >> >> at > >> >> > >> >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:2102) > >> >> at > >> >> > >> >> > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2288) > >> > > >> > > >> > > >> > > >> > -- > >> > YANG, Lin > >> > > >> > >> > >> > >> -- > >> Harsh J > > > > > > > > > > -- > > YANG, Lin > > > > > > -- > Harsh J > -- YANG, Lin
