Managing Dataset API Partitions - Spark 2.0
Hi everyone, I'd test some algorithms with the Dataset API offered by Spark 2.0.0. So I was wondering, *which is the best way for managing Dataset partitions?* E.g. in the data reading phase, what I use to do is the following *// RDD* *// if I want to set a custom minimum number of partitions* *val data = sc.textFile(inputPath, numPartitions)* *// If I want to coalesce with a new shape my RDD at some point* *sc.repartition(newNumPartitions)* *// Dataset API* *// Now with the Dataset API I'm calling directly the repartition method on the dataset* *spark.read.text(inputPath).repartition(newNumberOfPartition)* So I'll be glad to know if there're *any new valuable about custom partitioning dataset, either in the reading phase or at some point?* Thank you so much. Andrea -- *Andrea Spina* N.Tessera: *74598* MAT: *89369* *Ingegneria Informatica* *[LM] *(D.M. 270)
java.lang.OutOfMemoryError Spark MLlib ALS matrix factorization
(TID 9360, cloud-15, partition 375,PROCESS_LOCAL, 2268 bytes) 16/09/01 00:55:20 INFO TaskSetManager: Finished task 91.0 in stage 12.0 (TID 9093) in 1356978 ms on cloud-15 (120/720) 16/09/01 00:56:07 INFO TaskSetManager: Starting task 384.0 in stage 12.0 (TID 9361, cloud-24, partition 384,PROCESS_LOCAL, 2268 bytes) 16/09/01 00:56:07 WARN TaskSetManager: Lost task 18.0 in stage 12.0 (TID 9027, cloud-24): FetchFailed(BlockManagerId(2, cloud-22, 40528), shuffleId=22, mapId=671, reduceId=18, message= org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /data/1/peel/spark/tmp/spark-6225354a-22f0-45dd-aff0-76051ad609ed/executor-d5fbc621-341c-4fc9-bedc-c292dc7f038a/blockmgr-c8b40f38-99a9-4060-823d-50b502bd9f91/25/shuffle_22_671_0.index (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.(FileInputStream.java:138) at org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:191) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:298) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:58) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:58) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) I'm running with* spark-1.6.2*. I really can't figure out the reason behind that. My code simply calls the library as follows: val als = new ALS() .setIntermediateRDDStorageLevel(storageLevel) .setBlocks(numTasks) .setLambda(0.1) .setRank(50) .setIterations(10) .setSeed(42) val model = als.run(ratings) model.save(sc, outputPath) sc.stop() where - *ratings* as the input RDD (parallelized with *numTasks* partitions) contains (uid, iid, rate) rows about 8e6 users, 1e6 items and about (5,6)e9 ratings (700/user avg) - *numTasks*: currently is 240 * 3 (= numOfCores * 3) - *storageLevel*: MEMORY_AND_DISK I did several tries as follows: - get lower the number of blocks: 1 - numTasks, 2 - 240(numOfCores), 3 - let it setted by the MLlib implementation - change the storage level to MEMORY_ONLY I'd try to varying the spark.shuffle.memoryFraction as well, but I read is deprecated since 1.6 spark version. I'm running with a 15 nodes cluster - 16cpus per node, 32GB memory per node - with the following valuable properties: spark.executor.memory = 28672m spark.driver.memory = 28672m spark.deamon.memory = 28672m spark.driver.maxResultSize = 0 spark.network.timeout = 3000s Any help will be appreciated. Thank you. -- *Andrea Spina* N.Tessera: *74598* MAT: *89369* *Ingegneria Informatica* *[LM] *(D.M. 270)
Re: Issue with Spark on 25 nodes cluster
Hi, I solved by increasing the akka timeout time. All the bests, 2016-06-28 15:04 GMT+02:00 ANDREA SPINA <74...@studenti.unimore.it>: > Hello everyone, > > I am running some experiments with Spark 1.4.0 on a ~80GiB dataset located > on hdfs-2.7.1. The environment is a 25 nodes cluster, 16 cores per node. I > set the following params: > > spark.master = "spark://"${runtime.hostname}":7077" > > # 28 GiB of memory > spark.executor.memory = "28672m" > spark.worker.memory = "28672m" > spark.driver.memory = "2048m" > > spark.driver.maxResultSize = "0" > > I run some scaling experiments varying the machine set number. > I can successfully experiments with the whole number of nodes (25) and > also with (20) nodes. Experiments with environments of 5 nodes and 10 nodes > relentlessy fails. During the running spark executor begin to collect > failing jobs from different stages and end with the following trace: > > 16/06/28 03:11:09 INFO DAGScheduler: Job 14 failed: reduce at > sGradientDescent.scala:229, took 1778.508309 s > Exception in thread "main" org.apache.spark.SparkException: Job aborted > due to stage failure: Task 212 in stage 14.0 failed 4 times, most recent > failure: Lost task 212.3 in stage 14.0 (TID 12278, 130.149.21.19): > java.io.IOException: Connection from /130.149.21.16:35997 closed > at > org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104) > at > org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169) > at > io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169) > at > io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169) > at > io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169) > at > io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:738) > at > io.netty.channel.AbstractChannel$AbstractUnsafe$6.run(AbstractChannel.java:606) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) > at java.lang.Thread.run(Thread.java:745) > > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > Here > <https://dl.dropboxusercontent.com/u/78598929/spark-hadoop-org.apache.s
Issue with Spark on 25 nodes cluster
Hello everyone, I am running some experiments with Spark 1.4.0 on a ~80GiB dataset located on hdfs-2.7.1. The environment is a 25 nodes cluster, 16 cores per node. I set the following params: spark.master = "spark://"${runtime.hostname}":7077" # 28 GiB of memory spark.executor.memory = "28672m" spark.worker.memory = "28672m" spark.driver.memory = "2048m" spark.driver.maxResultSize = "0" I run some scaling experiments varying the machine set number. I can successfully experiments with the whole number of nodes (25) and also with (20) nodes. Experiments with environments of 5 nodes and 10 nodes relentlessy fails. During the running spark executor begin to collect failing jobs from different stages and end with the following trace: 16/06/28 03:11:09 INFO DAGScheduler: Job 14 failed: reduce at sGradientDescent.scala:229, took 1778.508309 s Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 212 in stage 14.0 failed 4 times, most recent failure: Lost task 212.3 in stage 14.0 (TID 12278, 130.149.21.19): java.io.IOException: Connection from /130.149.21.16:35997 closed at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104) at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169) at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:738) at io.netty.channel.AbstractChannel$AbstractUnsafe$6.run(AbstractChannel.java:606) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Here <https://dl.dropboxusercontent.com/u/78598929/spark-hadoop-org.apache.spark.deploy.master.Master-1-cloud-11.log> the Master full Log. As well, each Worker receive signal SIGTERM: 15 I can't figure out a solution as well. Thank you, Regards, Andrea -- *Andrea Spina* N.Tessera: *74598* MAT: *89369* *Ingegneria Informatica* *[LM] *(D.M. 270)