It means pretty much what it says. You ran out of space on an executor
(not driver), because the dir used for serialization temp files is
full (not all volumes). Set spark.local.dirs to something more
appropriate and larger.

On Sat, Mar 14, 2015 at 2:10 AM, Peng Xia <sparkpeng...@gmail.com> wrote:
> Hi
>
>
> I was running a logistic regression algorithm on a 8 nodes spark cluster,
> each node has 8 cores and 56 GB Ram (each node is running a windows system).
> And the spark installation driver has 1.9 TB capacity. The dataset I was
> training on are has around 40 million records with around 6600 features. But
> I always get this error during the training process:
>
> Py4JJavaError: An error occurred while calling
> o70.trainLogisticRegressionModelWithLBFGS.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 2709 in stage 3.0 failed 4 times, most recent failure: Lost task 2709.3 in
> stage 3.0 (TID 2766,
> workernode0.rbaHdInsightCluster5.b6.internal.cloudapp.net):
> java.io.IOException: There is not enough space on the disk
>         at java.io.FileOutputStream.writeBytes(Native Method)
>         at java.io.FileOutputStream.write(FileOutputStream.java:345)
>         at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
>         at
> org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300)
>         at
> org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247)
>         at
> org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107)
>         at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>         at
> java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914)
>         at
> java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575)
>         at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
>         at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>         at
> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
>         at
> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1177)
>         at
> org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:78)
>         at
> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
>         at
> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
>         at
> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:145)
>         at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:243)
>         at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
>         at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
>         at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>         at scala.Option.foreach(Option.scala:236)
>         at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
>         at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> The code is below:
>
> from pyspark.mllib.regression import LabeledPoint
> from pyspark.mllib.classification import LogisticRegressionWithSGD
> from numpy import array
> from sklearn.feature_extraction import FeatureHasher
> from pyspark import SparkContext
> sf = SparkConf().setAppName("test").set("spark.executor.memory",
> "45g").set("spark.cores.max", 62)
> sc = SparkContext(conf=sf)
> training_file = sc.textFile("train_small.txt")
> def hash_feature(line):
>     values = [0, dict()]
>     for index, x in enumerate(line.strip("\n").split('\t')):
>         if index == 0:
>             values[0] = float(x)
>         else:
>             values[1][str(index)+"_"+x] = 1
>     return values
> n_feature = 2**14
> hasher = FeatureHasher(n_features=n_feature)
> training_file_hashed = training_file.map(lambda line:
> [hash_feature(line)[0], hasher.transform([hash_feature(line)[1]])])
> def build_lable_points(line):
>     values = [0.0] * n_feature
>     for index, value in zip(line[1].indices, line[1].data):
>         values[index] = value
>     return LabeledPoint(line[0], values)
> parsed_training_data = training_file_hashed.map(lambda line:
> build_lable_points(line))
> model = LogisticRegressionWithSGD.train(parsed_training_data)
>
> Can anyone share any experience on this?

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to