Hi Jason, How many unique keys are you going to be generating from this program, roughly?
By default, the max-load of a DN is about 4k threads and if you're trying to push beyond that value then the NN will no longer select the DN as it would consider it already overloaded. In a fully distributed mode, you may not see this issue as there's several DNs and TTs to distribute the write load across. Try with a smaller input sample if there's a whole lot of keys you'll be creating files for, and see if that works instead (such that there's fewer files and you do not hit the xceiver/load limits). On Mon, Sep 17, 2012 at 7:20 PM, Jason Yang <[email protected]> wrote: > Hi, all > > I have written a simple MR program which partition a file into multiple > files bases on the clustering result of the points in this file, here is my > code: > --- > private int run() throws IOException > { > String scheme = getConf().get(CommonUtility.ATTR_SCHEME); > String ecgDir = getConf().get(CommonUtility.ATTR_ECG_DATA_DIR); > String outputDir = getConf().get(CommonUtility.ATTR_OUTPUT_DIR); > > // create JobConf > JobConf jobConf = new JobConf(getConf(), this.getClass()); > > // set path for input and output > Path inPath = new Path(scheme + ecgDir); > Path outPath = new Path(scheme + outputDir + > CommonUtility.OUTPUT_LOCAL_CLUSTERING); > FileInputFormat.addInputPath(jobConf, inPath); > FileOutputFormat.setOutputPath(jobConf, outPath); > > // clear output if it already existed > CommonUtility.deleteHDFSFile(outPath.toString()); > > // set format for input and output > jobConf.setInputFormat(WholeFileInputFormat.class); > jobConf.setOutputFormat(LocalClusterMSFOutputFormat.class); > > // set class of output key and value > jobConf.setOutputKeyClass(Text.class); > jobConf.setOutputValueClass(RRIntervalWritable.class); > > // set mapper and reducer > jobConf.setMapperClass(LocalClusteringMapper.class); > jobConf.setReducerClass(IdentityReducer.class); > > > // run the job > JobClient.runJob(jobConf); > return 0; > } > > ... > > public class LocalClusteringMapper extends MapReduceBase implements > Mapper<NullWritable, BytesWritable, Text, RRIntervalWritable> > { > @Override > public void map(NullWritable key, BytesWritable value, > OutputCollector<Text, RRIntervalWritable> output, Reporter reporter) > throws IOException > { > //read and cluster > ... > > // output > Iterator<RRIntervalWritable> it = rrArray.iterator(); > while (it.hasNext()) > { > RRIntervalWritable rr = it.next(); > > Text outputKey = new Text(rr.clusterResult ); > > output.collect(outputKey, rr); > } > > } > > ... > > public class LocalClusterMSFOutputFormat extends > MultipleSequenceFileOutputFormat<Text, RRIntervalWritable> > { > > protected String generateFileNameForKeyValue(Text key, > RRIntervalWritable value, String name) > { > return value.clusterResult.toString(); > } > } > --- > > But this program always get a IO Exception when running in a > pseudo-distributed cluster, and the log has been attached at the end of this > post. > > There's something wired: > 1. If I use the SequenceFileOutputFormat instead of > MultipleSequenceFileOutputFormat, this program would works fine( at least > there is no error in log). > 2. The one which always cause the error is the EcgData002509_LCF_3 > > >> >> 12/09/17 21:10:35 INFO mapred.MapTask: Starting flush of map output >> 12/09/17 21:10:35 INFO mapred.MapTask: Finished spill 0 >> 12/09/17 21:10:35 INFO mapred.TaskRunner: >> Task:attempt_local_0001_m_000019_0 is done. And is in the process of >> commiting >> 12/09/17 21:10:35 INFO mapred.LocalJobRunner: >> 12/09/17 21:10:35 INFO mapred.TaskRunner: Task >> 'attempt_local_0001_m_000019_0' done. >> 12/09/17 21:10:35 INFO mapred.LocalJobRunner: >> 12/09/17 21:10:35 INFO mapred.Merger: Merging 20 sorted segments >> 12/09/17 21:10:35 INFO mapred.Merger: Merging 2 intermediate segments out >> of a total of 20 >> 12/09/17 21:10:35 INFO mapred.Merger: Merging 10 intermediate segments out >> of a total of 19 >> 12/09/17 21:10:35 INFO mapred.Merger: Down to the last merge-pass, with 10 >> segments left of total size: 18913891 bytes >> 12/09/17 21:10:35 INFO mapred.LocalJobRunner: >> 12/09/17 21:10:39 WARN hdfs.DFSClient: DataStreamer Exception: >> org.apache.hadoop.ipc.RemoteException: java.io.IOException: File >> /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3 >> could only be replicated to 0 nodes, instead of 1 >> at >> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1271) >> at >> org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:422) >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) >> at java.lang.reflect.Method.invoke(Method.java:597) >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508) >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959) >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:396) >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953) >> at org.apache.hadoop.ipc.Client.call(Client.java:740) >> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220) >> at $Proxy0.addBlock(Unknown Source) >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) >> at java.lang.reflect.Method.invoke(Method.java:597) >> at >> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) >> at >> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) >> at $Proxy0.addBlock(Unknown Source) >> at >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.locateFollowingBlock(DFSClient.java:2937) >> at >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2819) >> at >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:2102) >> at >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2288) >> 12/09/17 21:10:39 WARN hdfs.DFSClient: Error Recovery for block null bad >> datanode[0] nodes == null >> 12/09/17 21:10:39 WARN hdfs.DFSClient: Could not get block locations. >> Source file >> "/work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3" >> - Aborting... >> 12/09/17 21:10:41 INFO mapred.LocalJobRunner: reduce > reduce >> 12/09/17 21:10:42 INFO mapred.JobClient: map 100% reduce 89% >> 12/09/17 21:10:42 WARN mapred.LocalJobRunner: job_local_0001 >> org.apache.hadoop.ipc.RemoteException: java.io.IOException: File >> /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3 >> could only be replicated to 0 nodes, instead of 1 >> at >> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1271) >> at >> org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:422) >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) >> at java.lang.reflect.Method.invoke(Method.java:597) >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508) >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959) >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:396) >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953) >> at org.apache.hadoop.ipc.Client.call(Client.java:740) >> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220) >> at $Proxy0.addBlock(Unknown Source) >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) >> at java.lang.reflect.Method.invoke(Method.java:597) >> at >> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) >> at >> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) >> at $Proxy0.addBlock(Unknown Source) >> at >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.locateFollowingBlock(DFSClient.java:2937) >> at >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2819) >> at >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:2102) >> at >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2288) >> 12/09/17 21:10:43 INFO mapred.JobClient: Job complete: job_local_0001 >> 12/09/17 21:10:43 INFO mapred.JobClient: Counters: 15 >> 12/09/17 21:10:43 INFO mapred.JobClient: FileSystemCounters >> 12/09/17 21:10:43 INFO mapred.JobClient: FILE_BYTES_READ=23297226 >> 12/09/17 21:10:43 INFO mapred.JobClient: HDFS_BYTES_READ=546711709 >> 12/09/17 21:10:43 INFO mapred.JobClient: FILE_BYTES_WRITTEN=232075142 >> 12/09/17 21:10:43 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=13530993 >> 12/09/17 21:10:43 INFO mapred.JobClient: Map-Reduce Framework >> 12/09/17 21:10:43 INFO mapred.JobClient: Reduce input groups=56 >> 12/09/17 21:10:43 INFO mapred.JobClient: Combine output records=0 >> 12/09/17 21:10:43 INFO mapred.JobClient: Map input records=20 >> 12/09/17 21:10:43 INFO mapred.JobClient: Reduce shuffle bytes=0 >> 12/09/17 21:10:43 INFO mapred.JobClient: Reduce output records=38837 >> 12/09/17 21:10:43 INFO mapred.JobClient: Spilled Records=102562 >> 12/09/17 21:10:43 INFO mapred.JobClient: Map output bytes=18691072 >> 12/09/17 21:10:43 INFO mapred.JobClient: Map input bytes=28649088 >> 12/09/17 21:10:43 INFO mapred.JobClient: Combine input records=0 >> 12/09/17 21:10:43 INFO mapred.JobClient: Map output records=55700 >> 12/09/17 21:10:43 INFO mapred.JobClient: Reduce input records=38838 >> 12/09/17 21:10:44 INFO mapred.LocalJobRunner: reduce > reduce >> Exception in thread "main" java.io.IOException: Job failed! >> at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1252) >> at org.yanglin.mr.lab.ecg.PESCJob.runLocalClustering(PESCJob.java:111) >> at org.yanglin.mr.lab.ecg.PESCJob.run(PESCJob.java:57) >> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65) >> at org.yanglin.mr.lab.ecg.PESCJob.main(PESCJob.java:117) >> 12/09/17 21:10:48 ERROR hdfs.DFSClient: Exception closing file >> /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3 >> : org.apache.hadoop.ipc.RemoteException: java.io.IOException: File >> /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3 >> could only be replicated to 0 nodes, instead of 1 >> at >> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1271) >> at >> org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:422) >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) >> at java.lang.reflect.Method.invoke(Method.java:597) >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508) >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959) >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:396) >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953) >> org.apache.hadoop.ipc.RemoteException: java.io.IOException: File >> /work/lab/output/localClustering/_temporary/_attempt_local_0001_r_000000_0/EcgData002509_LCF_3 >> could only be replicated to 0 nodes, instead of 1 >> at >> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1271) >> at >> org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:422) >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) >> at java.lang.reflect.Method.invoke(Method.java:597) >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:508) >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:959) >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:955) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:396) >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:953) >> at org.apache.hadoop.ipc.Client.call(Client.java:740) >> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220) >> at $Proxy0.addBlock(Unknown Source) >> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) >> at java.lang.reflect.Method.invoke(Method.java:597) >> at >> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) >> at >> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) >> at $Proxy0.addBlock(Unknown Source) >> at >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.locateFollowingBlock(DFSClient.java:2937) >> at >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2819) >> at >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:2102) >> at >> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2288) > > > > > -- > YANG, Lin > -- Harsh J
