Scaling problem in RandomForest?
Hi, the Random Forest implementation (1.2.1) is repeatably crashing when I increase the depth to 20. I generate random synthetic data (36 workers, 1,000,000 examples per worker, 30 features per example) as follows: val data = sc.parallelize(1 to 36, 36).mapPartitionsWithIndex((i, _) = { Array.tabulate(100){ _ = new LabeledPoint(Math.random(), Vectors.dense(Array.fill(30)(math.random))) }.toIterator }).cache() ...and then train on a Random Forest with 50 trees, to depth 20: val strategy = new Strategy(Regression, Variance, 20, maxMemoryInMB = 1000) RandomForest.trainRegressor(data, strategy, 50, sqrt, 1) ...and run on my EC2 cluster (36 slaves, master has 122GB of memory). After number crunching for a couple of hours, I get the following error: [sparkDriver-akka.actor.default-dispatcher-3] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:834) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:781) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:780) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:780) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1389) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 15/03/11 15:45:51 INFO scheduler.DAGScheduler: Job 92 failed: collectAsMap at DecisionTree.scala:653, took 46.062487 s -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Scaling-problem-in-RandomForest-tp22002.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Requested array size exceeds VM limit
Hi,I'm using MLLib to train a random forest. It's working fine to depth 15, but if I use depth 20 I get a*java.lang.OutOfMemoryError: Requested array size exceeds VM limit* on the driver, from the collectAsMap operation in DecisionTree.scala, around line 642.It doesn't happen until a good hour into training. I'm using 50 treees on 36 slaves with maxMemoryInMB=250, but still get an error even if I use a driver memory of 240G. Has anybody seen this error in this context before, and can advise on what might be triggering it?Best,Luke -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Requested-array-size-exceeds-VM-limit-tp21774.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Caching RDDs with shared memory - bug or feature?
If all RDD elements within a partition contain pointers to a single shared object, Spark persists as expected when the RDD is small. However, if the RDD is more than *200 elements* then Spark reports requiring much more memory than it actually does. This becomes a problem for large RDDs, as Spark refuses to persist even though it can. Is this a bug or is there a feature that I'm missing? Cheers, Luke *val* /n/ = ??? *class* Elem(*val* s:Array[Int]) *val* /rdd/ = /sc/.parallelize(/Seq/(1)).mapPartitions( _ = { *val* sharedArray = Array./ofDim/[Int](1000) /// Should require ~40MB/ (1 to /n/).toIterator.map(_ = *new* Elem(sharedArray)) }).cache().count() /// force computation/ For n = 100: /MemoryStore: Block rdd_1_0 stored as values in memory (estimated size *38.1 MB*, free 898.7 MB)/ For n = 200: /MemoryStore: Block rdd_1_0 stored as values in memory (estimated size *38.2 MB*, free 898.7 MB)/ For n = 201: /MemoryStore: Block rdd_1_0 stored as values in memory (estimated size *76.7 MB*, free 860.2 MB)/ For n = 5000: /MemoryStore: *Not enough space to cache rdd_1_0 in memory!* (computed 781.3 MB so far)/ Note: For medium sized n (where n200 but spark can still cache), the actual application memory still stays where it should - Spark just seems to vastly overreport how much memory it's using. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Caching-RDDs-with-shared-memory-bug-or-feature-tp20596.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RDD with object shared across elements within a partition. Magic number 200?
Hi all, I am trying to persist a spark RDD in which the elements of each partition all share access to a single, large object. However, this object seems get stored in memory several times. Reducing my problem down to the toy case of just a single partition with only 200 elements: *val* /nElements/ = 200 *class* Elem(*val* s:Array[Int]) *val* /rdd/ = /sc/.parallelize(/Seq/(1)).mapPartitions( _ = { *val* sharedArray = Array./ofDim/[Int](1000) /// Should require ~40MB/ (1 to /nElements/).toIterator.map(i = *new* Elem(sharedArray)) }).cache() /rdd/.count() /// force computation/ This consumes the expected amount of memory, as seen in the logs: /storage.MemoryStore: Block rdd_1_0 stored as values in memory (estimated size 38.2 MB, free 5.7 GB)/ However, 200 is the maximum number of elements for which this is so. Setting nElements=201 yields: /storage.MemoryStore: Block rdd_1_0 stored as values in memory (estimated size 76.7 MB, free 5.7 GB)/ What causes this? Where does this magic number 200 come from, and how can I increase it? Thanks for your help! - Luke -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-with-object-shared-across-elements-within-a-partition-Magic-number-200-tp19559.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD with object shared across elements within a partition. Magic number 200?
Some more details: Adding a println to the function reveals that it is indeed called only once. Furthermore, running: /rdd/.map(_.s.hashCode).min == /rdd/.map(_.s.hashCode).max // returns true ...reveals that all 1000 elements do indeed point to the same object, and so the data structure essentially behaves correctly. The problem comes when nExamples is much larger, and so it cannot persist. /storage.MemoryStore: Not enough space to cache rdd_1_0 in memory! (computed 6.1 GB so far)/ In this case, the comparison of hashCodes fails, because the function is recomputed. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-with-object-shared-across-elements-within-a-partition-Magic-number-200-tp19559p19578.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org