Hi Eric, IMHO you do have a solution by increasing the xcievers count in hdfs-site.xml, but this might give you a performance hit
<property> <name>dfs.datanode.max.xcievers</name> <value>4096</value> </property> To get a better understanding of how xcievers work go through this link: http://blog.cloudera.com/blog/2012/03/hbase-hadoop-xceivers/ Also make sure you do not have a leak in your code where there are open file descriptors. Thanks Divye Sheth On Mon, Apr 28, 2014 at 9:09 PM, Wright, Eric < ewri...@soleocommunications.com> wrote: > Currently, we are working on setting up a production hadoop cluster. We > have a smaller cluster of four nodes we are working on for development > (nn, snn, and two datanodes). From a high level, we flume our data into > HDFS, and then due to the way that flume may resend data on error (due to > the way it handles it’s reliability guarantees), it is possible to get > duplicate records. In our case, our batch size is 100 so we occasionally > see 100 records duplicated. In order to remedy this, we de-duplicate the > data using a map reduce job. > > > > For this job, the mapper has the following generic parameters > <LongWriteable, Text, Text, Text> > > > > LongWriteable – byte offset from beginning of file > > Text – Line of text currently being processed > > Text – Line of text currently being processed > > Text – Output File Indication > > > > And the reducer has the following generic parameters <Text, Text, > NullWriteable, Text> > > Text – Line of text currently being processed > > Text – Output File Indication > > NullWriteable – Empty key > > Text – Line of text currently being processed > > > > We read the standard LongWriteable, Text as provided by a > TextFileInputFormat input key, value types coming into the mapper. We emit > from the mapper the value as the key (ie. the full text string it passed in > as the value to the mapper). This would essentially mean that the reducer > will receive as input Text, Text. The reducer key is the string of text, > and the value is a destination file. The reducer uses the value it received > from the mapper to determine an output file as it uses MultipleOutputs. > > > > So, our map phase reads the strings, pushes the lines out as the keys so > that the sort and partition phase send any duplicate lines to a single > reducer, and then the reducer writes no more than one key (line) to its > output file. > > > > Again, this does work at smaller scales, but when we increase our payload, > we start getting numerous IO errors. It fails when we start processing 10’s > of GB’s of data. I have tried numerous different things and believe that > either > our configuration is an issue or we are doing something fundamentally > wrong. > > > > Any advice or things to check would be greatly appreciated. I’d be happy > to provide more information if it would help to diagnose the problem. > > > > > > > > *Currently collected information:* > > > > [analytics@bi-data-12 ~]$ hadoop version > > Hadoop 2.2.0.2.0.6.0-102 > > Subversion g...@github.com:hortonworks/hadoop.git -r > 02ad68f19849a0a986dac36b705491f6791f9179 > > Compiled by jenkins on 2014-01-21T00:56Z > > Compiled with protoc 2.5.0 > > From source with checksum 66f6c486e27479105979740178fbf0 > > This command was run using > /usr/lib/hadoop/hadoop-common-2.2.0.2.0.6.0-102.jar > > [analytics@bi-data-12 ~]$ > > > > > > From the datanode service logs: > > 2014-04-25 13:23:00,349 ERROR datanode.DataNode > (DataXceiver.java:writeBlock(540)) - > DataNode{data=FSDataset{dirpath='[/disk1/hdfs/data/current, > /disk2/hdfs/data/current, /disk3/hdfs/data/current, > /disk4/hdfs/data/current]'}, localName=' > bi-data-12.soleocommunications.com:50010', > storageID='DS-225705953-10.1.21.12-50010-1394477125650', > xmitsInProgress=1}:Exception transfering block > BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 to mirror > 10.1.21.10:50010: java.io.EOFException: Premature EOF: no length prefix > available > > 2014-04-25 13:23:00,349 WARN datanode.DataNode > (DataXceiverServer.java:run(155)) - bi-data-12.soleocommunications.com:50010 > :DataXceiverServer: > > java.io.IOException: Xceiver count 4097 exceeds the limit of concurrent > xcievers: 4096 > > at > org.apache.hadoop.hdfs.server.datanode.DataXceiverServer.run(DataXceiverServer.java:137) > > at java.lang.Thread.run(Thread.java:722) > > 2014-04-25 13:23:00,349 INFO datanode.DataNode > (DataXceiver.java:writeBlock(600)) - opWriteBlock > BP-1078665828-10.1.21.30-1394476780595:blk_1073845355_123420 received > exception java.io.EOFException: Premature EOF: no length prefix available > > 2014-04-25 13:23:00,350 ERROR datanode.DataNode > (DataXceiver.java:run(225)) - > bi-data-12.soleocommunications.com:50010:DataXceiver > error processing WRITE_BLOCK operation src: /10.1.21.12:37307 dest: / > 10.1.21.12:50010 > > java.io.EOFException: Premature EOF: no length prefix available > > at > org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1492) > > at > org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:511) > > at > org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:115) > > at > org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:68) > > at > org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:221) > > at java.lang.Thread.run(Thread.java:722) > > 2014-04-25 13:23:00,352 INFO datanode.DataNode > (DataXceiver.java:writeBlock(432)) - Receiving > BP-1078665828-10.1.21.30-1394476780595:blk_1073845357_123422 src: / > 10.1.21.12:37311 dest: /10.1.21.12:50010 > > > > > > > > From the map job logs we see exceptions like: > > 2014-04-25 13:04:16,621 INFO [Thread-147] org.apache.hadoop.hdfs.DFSClient: > Exception in createBlockOutputStream > > java.io.IOException: Connection reset by peer > > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:218) > > at sun.nio.ch.IOUtil.read(IOUtil.java:191) > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:359) > > at > org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57) > > at > org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142) > > at > org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) > > at > org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) > > at > org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118) > > at java.io.FilterInputStream.read(FilterInputStream.java:83) > > at java.io.FilterInputStream.read(FilterInputStream.java:83) > > at > org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490) > > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1160) > > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088) > > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514) > > … > > 2014-04-25 13:04:53,116 INFO [Thread-590] > org.apache.hadoop.hdfs.DFSClient: Exception in createBlockOutputStream > > org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while > waiting for channel to be ready for connect. ch : > java.nio.channels.SocketChannel[connection-pending > local=/10.1.21.10:43973remote=/ > 10.1.21.10:50010] > > at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532) > > at > org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1308) > > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1133) > > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088) > > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514) > > 2014-04-25 13:04:53,116 INFO [Thread-590] > org.apache.hadoop.hdfs.DFSClient: Abandoning > BP-1078665828-10.1.21.30-1394476780595:blk_1073840493_118558 > > 2014-04-25 13:04:53,117 INFO [Thread-590] > org.apache.hadoop.hdfs.DFSClient: Excluding datanode 10.1.21.10:50010 > > 2014-04-25 13:06:33,056 WARN [ResponseProcessor for block > BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012] > org.apache.hadoop.hdfs.DFSClient: DFSOutputStream ResponseProcessor > exception for block > BP-1078665828-10.1.21.30-1394476780595:blk_1073840947_119012 > > java.net.SocketTimeoutException: 65000 millis timeout while waiting for > channel to be ready for read. ch : > java.nio.channels.SocketChannel[connected local=/10.1.21.10:49537 remote=/ > 10.1.21.12:50010] > > at > org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164) > > at > org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) > > at > org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) > > at > org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118) > > at java.io.FilterInputStream.read(FilterInputStream.java:83) > > at java.io.FilterInputStream.read(FilterInputStream.java:83) > > at > org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490) > > at > org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:116) > > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:721) > > 2014-04-25 13:07:18,519 ERROR [main] > org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException > as:mapred (auth:SIMPLE) cause:java.io.IOException: All datanodes > 10.1.21.12:50010 are bad. Aborting... > > 2014-04-25 13:07:19,559 WARN [main] org.apache.hadoop.mapred.YarnChild: > Exception running child : java.io.IOException: All datanodes > 10.1.21.12:50010 are bad. Aborting... > > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1008) > > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823) > > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475) > > > > … > > 2014-04-25 13:32:40,197 ERROR [main] > org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException > as:mapred (auth:SIMPLE) > cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > failed to create file > /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 > for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client > 10.1.21.12 because current leaseholder is trying to recreate file. > > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303) > > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114) > > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043) > > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996) > > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491) > > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301) > > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570) > > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585) > > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928) > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2053) > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:415) > > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) > > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2047) > > … > > 2014-04-25 13:32:40,198 WARN [main] org.apache.hadoop.mapred.YarnChild: > Exception running child : > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > failed to create file > /data/working/cdr_deduped/verizon/bethlehem/dme/FlumeData_2014-04-25.1398433347544-r-00000 > for DFSClient_attempt_1398102369867_0184_r_000000_1_576685315_1 on client > 10.1.21.12 because current leaseholder is trying to recreate file. > > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2303) > > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2114) > > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2043) > > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1996) > > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:491) > > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:301) > > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59570) > > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585) > > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928) > > >