Re: flatMap output on disk / flatMap memory overhead
Hi Ocatavian, Just out of curiosity, did you try persisting your RDD in serialized format MEMORY_AND_DISK_SER or MEMORY_ONLY_SER ?? i.e. changing your : rdd.persist(MEMORY_AND_DISK) to rdd.persist(MEMORY_ONLY_SER) Regards On Wed, Jun 10, 2015 at 7:27 AM, Imran Rashid iras...@cloudera.com wrote: I agree with Richard. It looks like the issue here is shuffling, and shuffle data is always written to disk, so the issue is definitely not that all the output of flatMap has to be stored in memory. If at all possible, I'd first suggest upgrading to a new version of spark -- even in 1.2, there were big improvements to shuffle with sort based shuffle as the default. On Tue, Jun 2, 2015 at 1:09 PM, Richard Marscher rmarsc...@localytics.com wrote: Are you sure it's memory related? What is the disk utilization and IO performance on the workers? The error you posted looks to be related to shuffle trying to obtain block data from another worker node and failing to do so in reasonable amount of time. It may still be memory related, but I'm not sure that other resources are ruled out yet. On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea octavian.ga...@inf.ethz.ch wrote: I was tried using reduceByKey, without success. I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey . However, I got the same error as before, namely the error described here: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html My task is to count the frequencies of pairs of words that occur in a set of documents at least 5 times. I know that this final output is sparse and should comfortably fit in memory. However, the intermediate pairs that are spilled by flatMap might need to be stored on the disk, but I don't understand why the persist option does not work and my job fails. My code: rdd.persist(StorageLevel.MEMORY_AND_DISK) .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type ((word1,word2) , 1) .reduceByKey((a,b) = (a + b).toShort) .filter({case((x,y),count) = count = 5}) My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One node I keep for the master, 7 nodes for the workers. my conf: conf.set(spark.cores.max, 128) conf.set(spark.akka.frameSize, 1024) conf.set(spark.executor.memory, 115g) conf.set(spark.shuffle.file.buffer.kb, 1000) my spark-env.sh: ulimit -n 20 SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit -XX:-UseCompressedOops SPARK_DRIVER_MEMORY=129G spark version: 1.1.1 Thank you a lot for your help! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SaveAsTextFile brings down data nodes with IO Exceptions
Hey, Did you find any solution for this issue, we are seeing similar logs in our Data node logs. Appreciate any help. 2015-05-15 10:51:43,615 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: NttUpgradeDN1:50010:DataXceiver error processing WRITE_BLOCK operation src: /192.168.112.190:46253 dst: /192.168.151.104:50010 java.net.SocketTimeoutException: 6 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/192.168.151.104:50010 remote=/192.168.112.190:46253] at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) at java.io.BufferedInputStream.fill(Unknown Source) at java.io.BufferedInputStream.read1(Unknown Source) at java.io.BufferedInputStream.read(Unknown Source) at java.io.DataInputStream.read(Unknown Source) at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:192) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:742) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232) at java.lang.Thread.run(Unknown Source) Thanks Puneet On Wed, Dec 3, 2014 at 2:50 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hi all, as the last stage of execution, I am writing out a dataset to disk. Before I do this, I force the DAG to resolve so this is the only job left in the pipeline. The dataset in question is not especially large (a few gigabytes). During this step however, HDFS will inevitable crash. I will lose connection to data-nodes and get stuck in the loop of death – failure causes job restart, eventually causing the overall job to fail. On the data node logs I see the errors below. Does anyone have any ideas as to what is going on here? Thanks! java.io.IOException: Premature EOF from inputStream at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:194) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:455) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:741) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:718) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:72) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:225) at java.lang.Thread.run(Thread.java:745) innovationdatanode03.cof.ds.capitalone.com:1004:DataXceiver error processing WRITE_BLOCK operation src: /10.37.248.60:44676 dst: /10.37.248.59:1004 java.net.SocketTimeoutException: 65000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.37.248.59:43692 remote=/10.37.248.63:1004] at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118) at java.io.FilterInputStream.read(FilterInputStream.java:83) at java.io.FilterInputStream.read(FilterInputStream.java:83) at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2101) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:660) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126) at
Re: SaveAsTextFile brings down data nodes with IO Exceptions
I am seeing this on hadoop 2.4.0 version. Thanks for your suggestions, i will try those and let you know if they help ! On Sat, May 16, 2015 at 1:57 AM, Steve Loughran ste...@hortonworks.com wrote: What version of Hadoop are you seeing this on? On 15 May 2015, at 20:03, Puneet Kapoor puneet.cse.i...@gmail.com wrote: Hey, Did you find any solution for this issue, we are seeing similar logs in our Data node logs. Appreciate any help. 2015-05-15 10:51:43,615 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: NttUpgradeDN1:50010:DataXceiver error processing WRITE_BLOCK operation src: /192.168.112.190:46253 dst: /192.168.151.104:50010 java.net.SocketTimeoutException: 6 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/192.168.151.104:50010 remote=/192.168.112.190:46253] at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) at java.io.BufferedInputStream.fill(Unknown Source) at java.io.BufferedInputStream.read1(Unknown Source) at java.io.BufferedInputStream.read(Unknown Source) at java.io.DataInputStream.read(Unknown Source) at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:192) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:742) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232) at java.lang.Thread.run(Unknown Source) That's being logged @ error level in DN. It doesn't mean the DN has crashed, only that it timed out waiting for data: something has gone wrong elsewhere. https://issues.apache.org/jira/browse/HDFS-693 there's a couple of properties you can do to extend timeouts property namedfs.socket.timeout/name value2/value /property property namedfs.datanode.socket.write.timeout/name value2/value /property You can also increase the number of data node tranceiver threads to handle data IO across the network property namedfs.datanode.max.xcievers/name value4096/value /property Yes, that property has that explicit spellinng, it's easy to get wrong