Re: flatMap output on disk / flatMap memory overhead

2015-08-01 Thread Puneet Kapoor
Hi Ocatavian,

Just out of curiosity, did you try persisting your RDD in serialized format
MEMORY_AND_DISK_SER  or MEMORY_ONLY_SER ??
i.e. changing your :
rdd.persist(MEMORY_AND_DISK)
to
rdd.persist(MEMORY_ONLY_SER)

Regards

On Wed, Jun 10, 2015 at 7:27 AM, Imran Rashid iras...@cloudera.com wrote:

 I agree with Richard.  It looks like the issue here is shuffling, and
 shuffle data is always written to disk, so the issue is definitely not that
 all the output of flatMap has to be stored in memory.

 If at all possible, I'd first suggest upgrading to a new version of spark
 -- even in 1.2, there were big improvements to shuffle with sort based
 shuffle as the default.

 On Tue, Jun 2, 2015 at 1:09 PM, Richard Marscher rmarsc...@localytics.com
  wrote:

 Are you sure it's memory related? What is the disk utilization and IO
 performance on the workers? The error you posted looks to be related to
 shuffle trying to obtain block data from another worker node and failing to
 do so in reasonable amount of time. It may still be memory related, but I'm
 not sure that other resources are ruled out yet.

 On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea 
 octavian.ga...@inf.ethz.ch wrote:

 I was tried using reduceByKey, without success.

 I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey
 .
 However, I got the same error as before, namely the error described here:

 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html

 My task is to count the frequencies of pairs of words that occur in a
 set of
 documents at least 5 times. I know that this final output is sparse and
 should comfortably fit in memory. However, the intermediate pairs that
 are
 spilled by flatMap might need to be stored on the disk, but I don't
 understand why the persist option does not work and my job fails.

 My code:

 rdd.persist(StorageLevel.MEMORY_AND_DISK)
  .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type
 ((word1,word2) , 1)
 .reduceByKey((a,b) = (a + b).toShort)
 .filter({case((x,y),count) = count = 5})


 My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node.
 One
 node I keep for the master, 7 nodes for the workers.

 my conf:

 conf.set(spark.cores.max, 128)
 conf.set(spark.akka.frameSize, 1024)
 conf.set(spark.executor.memory, 115g)
 conf.set(spark.shuffle.file.buffer.kb, 1000)

 my spark-env.sh:
  ulimit -n 20
  SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit
 -XX:-UseCompressedOops
  SPARK_DRIVER_MEMORY=129G

 spark version: 1.1.1

 Thank you a lot for your help!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: SaveAsTextFile brings down data nodes with IO Exceptions

2015-05-15 Thread Puneet Kapoor
Hey,

Did you find any solution for this issue, we are seeing similar logs in our
Data node logs. Appreciate any help.


2015-05-15 10:51:43,615 ERROR
org.apache.hadoop.hdfs.server.datanode.DataNode:
NttUpgradeDN1:50010:DataXceiver error processing WRITE_BLOCK operation
 src: /192.168.112.190:46253 dst: /192.168.151.104:50010
java.net.SocketTimeoutException: 6 millis timeout while waiting for
channel to be ready for read. ch :
java.nio.channels.SocketChannel[connected local=/192.168.151.104:50010
remote=/192.168.112.190:46253]
at
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
at
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
at java.io.BufferedInputStream.fill(Unknown Source)
at java.io.BufferedInputStream.read1(Unknown Source)
at java.io.BufferedInputStream.read(Unknown Source)
at java.io.DataInputStream.read(Unknown Source)
at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:192)
at
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
at
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
at
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
at
org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446)
at
org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702)
at
org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:742)
at
org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124)
at
org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71)
at
org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232)
at java.lang.Thread.run(Unknown Source)

Thanks
Puneet

On Wed, Dec 3, 2014 at 2:50 AM, Ganelin, Ilya ilya.gane...@capitalone.com
wrote:

 Hi all, as the last stage of execution, I am writing out a dataset to disk. 
 Before I do this, I force the DAG to resolve so this is the only job left in 
 the pipeline. The dataset in question is not especially large (a few 
 gigabytes). During this step however, HDFS will inevitable crash. I will lose 
 connection to data-nodes and get stuck in the loop of death – failure causes 
 job restart, eventually causing the overall job to fail. On the data node 
 logs I see the errors below. Does anyone have any ideas as to what is going 
 on here? Thanks!


 java.io.IOException: Premature EOF from inputStream
   at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:194)
   at 
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
   at 
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
   at 
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
   at 
 org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:455)
   at 
 org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:741)
   at 
 org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:718)
   at 
 org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126)
   at 
 org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:72)
   at 
 org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:225)
   at java.lang.Thread.run(Thread.java:745)




 innovationdatanode03.cof.ds.capitalone.com:1004:DataXceiver error processing 
 WRITE_BLOCK operation  src: /10.37.248.60:44676 dst: /10.37.248.59:1004
 java.net.SocketTimeoutException: 65000 millis timeout while waiting for 
 channel to be ready for read. ch : java.nio.channels.SocketChannel[connected 
 local=/10.37.248.59:43692 remote=/10.37.248.63:1004]
   at 
 org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
   at 
 org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
   at 
 org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
   at 
 org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
   at java.io.FilterInputStream.read(FilterInputStream.java:83)
   at java.io.FilterInputStream.read(FilterInputStream.java:83)
   at 
 org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2101)
   at 
 org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:660)
   at 
 org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126)
   at 
 

Re: SaveAsTextFile brings down data nodes with IO Exceptions

2015-05-15 Thread Puneet Kapoor
I am seeing this on hadoop 2.4.0 version.

Thanks for your suggestions, i will try those and let you know if they help
!

On Sat, May 16, 2015 at 1:57 AM, Steve Loughran ste...@hortonworks.com
wrote:

  What version of Hadoop are you seeing this on?


  On 15 May 2015, at 20:03, Puneet Kapoor puneet.cse.i...@gmail.com
 wrote:

  Hey,

  Did you find any solution for this issue, we are seeing similar logs in
 our Data node logs. Appreciate any help.





  2015-05-15 10:51:43,615 ERROR
 org.apache.hadoop.hdfs.server.datanode.DataNode:
 NttUpgradeDN1:50010:DataXceiver error processing WRITE_BLOCK operation
  src: /192.168.112.190:46253 dst: /192.168.151.104:50010
 java.net.SocketTimeoutException: 6 millis timeout while waiting for
 channel to be ready for read. ch :
 java.nio.channels.SocketChannel[connected local=/192.168.151.104:50010
 remote=/192.168.112.190:46253]
 at
 org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
 at
 org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
 at
 org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
 at java.io.BufferedInputStream.fill(Unknown Source)
 at java.io.BufferedInputStream.read1(Unknown Source)
 at java.io.BufferedInputStream.read(Unknown Source)
 at java.io.DataInputStream.read(Unknown Source)
 at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:192)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
 at
 org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446)
 at
 org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702)
 at
 org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:742)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124)
 at
 org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71)
 at
 org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232)
 at java.lang.Thread.run(Unknown Source)


  That's being logged @ error level in DN. It doesn't mean the DN has
 crashed, only that it timed out waiting for data: something has gone wrong
 elsewhere.

  https://issues.apache.org/jira/browse/HDFS-693


 there's a couple of properties you can do to extend timeouts

   property

 namedfs.socket.timeout/name

 value2/value

 /property


 property

 namedfs.datanode.socket.write.timeout/name

 value2/value

 /property



 You can also increase the number of data node tranceiver threads to handle
 data IO across the network


 property
 namedfs.datanode.max.xcievers/name
 value4096/value
 /property

 Yes, that property has that explicit spellinng, it's easy to get wrong