Any idea anyone? This seems like a pretty basic requirement and i'm sure a minor config change might get it to work. I'd appreciate any pointers as i'm blocked on this since last night.
btw, spark version is 0.8.0 On Jan 7, 2014, at 11:31 PM, Prashant Sharma <[email protected]> wrote: > spark version ? > > > On Wed, Jan 8, 2014 at 12:43 PM, Vipul Pandey <[email protected]> wrote: > Hi, > > Disclaimer : Newbie (well, a returning user) > > Setup : > 20 nodes > -Dspark.executor.memory=40g , essentially tons of space for my usecase > > > Pretty straight forward join between two inputs > - 17G (distributed in 10 equally sized - 1.7g files) > - 49Mb (1 file) > I just need to join based on the keys and write out values from both as tuples > ================================================================== > > val XA = sc.textFile("path-to-17GB") //assume this to be a tab separated > key value pair > val XAMap = XA.map(x => { > val arr = x.split("\t") > (arr(0),arr(1)) > }) > > > val XY = sc.textFile("pathTo49MB") // assume it to be a tab separated > key value pair > val XYMap = XY.map(x => { > val arr = x.split("\t") > (arr(0), arr(1)) > }).collect.toMap > > val bc = sc.broadcast(XYMap) > > val joined = XAMap.map(v => { > (bc.value(v._1),v._2) > }) > > joined.saveAsTextFile("path-to-output") > ================================================================== > > When i try to save the text file it throws a OOME and my shell quits. > any clues? > > scala> joined.saveAsTextFile("path-to-output") > Uncaught error from thread [spark-akka.actor.default-dispatcher-4] shutting > down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for > ActorSystem[spark] > java.lang.OutOfMemoryError: Java heap space > at java.util.Arrays.copyOf(Arrays.java:2786) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:94) > at > java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1823) > at java.io.ObjectOutputStream.write(ObjectOutputStream.java:670) > at > org.apache.spark.scheduler.ResultTask.writeExternal(ResultTask.scala:116) > at > java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1429) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1398) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:27) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:47) > at > org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:78) > at > org.apache.spark.scheduler.cluster.ClusterTaskSetManager.resourceOffer(ClusterTaskSetManager.scala:375) > at > org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(ClusterScheduler.scala:215) > at > org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3$$anonfun$apply$3.apply(ClusterScheduler.scala:212) > at > org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3$$anonfun$apply$3.apply(ClusterScheduler.scala:209) > at scala.collection.Iterator$class.foreach(Iterator.scala:772) > at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:73) > at scala.Enumeration$ValueSet.foreach(Enumeration.scala:234) > at > org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3.apply(ClusterScheduler.scala:209) > at > org.apache.spark.scheduler.cluster.ClusterScheduler$$anonfun$resourceOffers$3.apply(ClusterScheduler.scala:209) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.cluster.ClusterScheduler.resourceOffers(ClusterScheduler.scala:209) > at > org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor.makeOffers(StandaloneSchedulerBackend.scala:113) > at > org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor$$anonfun$receive$1.apply(StandaloneSchedulerBackend.scala:91) > at > org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor$$anonfun$receive$1.apply(StandaloneSchedulerBackend.scala:64) > at akka.actor.Actor$class.apply(Actor.scala:318) > at > org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend$DriverActor.apply(StandaloneSchedulerBackend.scala:47) > at akka.actor.ActorCell.invoke(ActorCell.scala:626) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:197) > > > > > -- > Prashant
