Scaling problem in RandomForest?

2015-03-11 Thread insperatum
Hi, the Random Forest implementation (1.2.1) is repeatably crashing when I
increase the depth to 20. I generate random synthetic data (36 workers,
1,000,000 examples per worker, 30 features per example) as follows:

val data = sc.parallelize(1 to 36, 36).mapPartitionsWithIndex((i, _) =
{
  Array.tabulate(100){ _ =
new LabeledPoint(Math.random(),
Vectors.dense(Array.fill(30)(math.random)))
  }.toIterator
}).cache()

...and then train on a Random Forest with 50 trees, to depth 20:

val strategy = new Strategy(Regression, Variance, 20, maxMemoryInMB =
1000)
RandomForest.trainRegressor(data, strategy, 50, sqrt, 1)

...and run on my EC2 cluster (36 slaves, master has 122GB of memory). After
number crunching for a couple of hours, I get the following error:

[sparkDriver-akka.actor.default-dispatcher-3] shutting down ActorSystem
[sparkDriver]
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:834)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:781)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:780)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:780)
at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1389)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/03/11 15:45:51 INFO scheduler.DAGScheduler: Job 92 failed: collectAsMap
at DecisionTree.scala:653, took 46.062487 s



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Scaling-problem-in-RandomForest-tp22002.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Requested array size exceeds VM limit

2015-02-23 Thread insperatum
Hi,I'm using MLLib to train a random forest. It's working fine to depth 15,
but if I use depth 20 I get a*java.lang.OutOfMemoryError: Requested array
size exceeds VM limit* on the driver, from the collectAsMap operation in
DecisionTree.scala, around line 642.It doesn't happen until a good hour into
training. I'm using 50 treees on 36 slaves with maxMemoryInMB=250, but still
get an error even if I use a driver memory of 240G. Has anybody seen this
error in this context before, and can advise on what might be triggering
it?Best,Luke



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Requested-array-size-exceeds-VM-limit-tp21774.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Caching RDDs with shared memory - bug or feature?

2014-12-09 Thread insperatum
If all RDD elements within a partition contain pointers to a single shared
object, Spark persists as expected when the RDD is small. However, if the
RDD is more than *200 elements* then Spark reports requiring much more
memory than it actually does. This becomes a problem for large RDDs, as
Spark refuses to persist even though it can. Is this a bug or is there a
feature that I'm missing?
Cheers, Luke

*val* /n/ = ???
*class* Elem(*val* s:Array[Int])
*val* /rdd/ = /sc/.parallelize(/Seq/(1)).mapPartitions( _ = {
*val* sharedArray = Array./ofDim/[Int](1000)   /// Should require
~40MB/
(1 to /n/).toIterator.map(_ = *new* Elem(sharedArray))
}).cache().count()   /// force computation/

For n = 100: /MemoryStore: Block rdd_1_0 stored as values in memory
(estimated size *38.1 MB*, free 898.7 MB)/
For n = 200: /MemoryStore: Block rdd_1_0 stored as values in memory
(estimated size *38.2 MB*, free 898.7 MB)/
For n = 201: /MemoryStore: Block rdd_1_0 stored as values in memory
(estimated size *76.7 MB*, free 860.2 MB)/
For n = 5000: /MemoryStore: *Not enough space to cache rdd_1_0 in memory!*
(computed 781.3 MB so far)/

Note: For medium sized n (where n200 but spark can still cache), the actual
application memory still stays where it should - Spark just seems to vastly
overreport how much memory it's using. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Caching-RDDs-with-shared-memory-bug-or-feature-tp20596.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RDD with object shared across elements within a partition. Magic number 200?

2014-11-22 Thread insperatum
Hi all,
I am trying to persist a spark RDD in which the elements of each partition
all share access to a single, large object. However, this object seems get
stored in memory several times. Reducing my problem down to the toy case of
just a single partition with only 200 elements:

*val* /nElements/ = 200
*class* Elem(*val* s:Array[Int])
*val* /rdd/ = /sc/.parallelize(/Seq/(1)).mapPartitions( _ = {
*val* sharedArray = Array./ofDim/[Int](1000)   /// Should require
~40MB/
(1 to /nElements/).toIterator.map(i = *new* Elem(sharedArray))
}).cache()
/rdd/.count()   /// force computation/

This consumes the expected amount of memory, as seen in the logs:
/storage.MemoryStore: Block rdd_1_0 stored as values in memory (estimated
size 38.2 MB, free 5.7 GB)/

However, 200 is the maximum number of elements for which this is so. Setting
nElements=201 yields:
/storage.MemoryStore: Block rdd_1_0 stored as values in memory (estimated
size 76.7 MB, free 5.7 GB)/

What causes this? Where does this magic number 200 come from, and how can I
increase it?

Thanks for your help!
- Luke



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-with-object-shared-across-elements-within-a-partition-Magic-number-200-tp19559.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD with object shared across elements within a partition. Magic number 200?

2014-11-22 Thread insperatum
Some more details: Adding a println to the function reveals that it is indeed
called only once. Furthermore, running:

/rdd/.map(_.s.hashCode).min == /rdd/.map(_.s.hashCode).max  // returns true

...reveals that all 1000 elements do indeed point to the same object,
and so the data structure essentially behaves correctly. The problem comes
when nExamples is much larger, and so it cannot persist.

/storage.MemoryStore: Not enough space to cache rdd_1_0 in memory! (computed
6.1 GB so far)/

In this case, the comparison of hashCodes fails, because the function is
recomputed.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-with-object-shared-across-elements-within-a-partition-Magic-number-200-tp19559p19578.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org