And the thing is code runs just fine if I reduce the number of rows in my data?
On Sun, Sep 14, 2014 at 8:45 PM, Chengi Liu <chengi.liu...@gmail.com> wrote: > I am using spark1.0.2. > This is my work cluster.. so I can't setup a new version readily... > But right now, I am not using broadcast .. > > > conf = SparkConf().set("spark.executor.memory", > "32G").set("spark.akka.frameSize", "1000") > sc = SparkContext(conf = conf) > rdd = sc.parallelize(matrix,5) > > from pyspark.mllib.clustering import KMeans > from math import sqrt > clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, > initializationMode="random") > def error(point): > center = clusters.centers[clusters.predict(point)] > return sqrt(sum([x**2 for x in (point - center)])) > > WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y) > print "Within Set Sum of Squared Error = " + str(WSSSE) > > > executed by > spark-submit --master $SPARKURL clustering_example.py --executor-memory > 32G --driver-memory 60G > > and the error I see > py4j.protocol.Py4JJavaError: An error occurred while calling > o26.trainKMeansModel. > : org.apache.spark.SparkException: Job aborted due to stage failure: All > masters are unresponsive! Giving up. > at org.apache.spark.scheduler.DAGScheduler.org > <http://org.apache.spark.scheduler.dagscheduler.org/> > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > and > 14/09/14 20:43:30 WARN AppClient$ClientActor: Could not connect to > akka.tcp://sparkMaster@hostname:7077: > akka.remote.EndpointAssociationException: Association failed with > [akka.tcp://sparkMaster@ hostname:7077] > > ?? > Any suggestions?? > > > On Sun, Sep 14, 2014 at 8:39 PM, Davies Liu <dav...@databricks.com> wrote: > >> Hey Chengi, >> >> What's the version of Spark you are using? It have big improvements >> about broadcast in 1.1, could you try it? >> >> On Sun, Sep 14, 2014 at 8:29 PM, Chengi Liu <chengi.liu...@gmail.com> >> wrote: >> > Any suggestions.. I am really blocked on this one >> > >> > On Sun, Sep 14, 2014 at 2:43 PM, Chengi Liu <chengi.liu...@gmail.com> >> wrote: >> >> >> >> And when I use sparksubmit script, I get the following error: >> >> >> >> py4j.protocol.Py4JJavaError: An error occurred while calling >> >> o26.trainKMeansModel. >> >> : org.apache.spark.SparkException: Job aborted due to stage failure: >> All >> >> masters are unresponsive! Giving up. >> >> at >> >> org.apache.spark.scheduler.DAGScheduler.org >> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) >> >> at >> >> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) >> >> at >> >> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) >> >> at >> >> >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> >> at >> >> >> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) >> >> at >> >> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >> >> at >> >> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >> >> at scala.Option.foreach(Option.scala:236) >> >> at >> >> >> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) >> >> at >> >> >> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) >> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >> >> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >> >> at >> >> >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >> >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> >> at >> >> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> >> at >> >> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> >> at >> >> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> >> >> >> >> My spark submit code is >> >> >> >> conf = SparkConf().set("spark.executor.memory", >> >> "32G").set("spark.akka.frameSize", "1000") >> >> sc = SparkContext(conf = conf) >> >> rdd = sc.parallelize(matrix,5) >> >> >> >> from pyspark.mllib.clustering import KMeans >> >> from math import sqrt >> >> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, >> >> initializationMode="random") >> >> def error(point): >> >> center = clusters.centers[clusters.predict(point)] >> >> return sqrt(sum([x**2 for x in (point - center)])) >> >> >> >> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y) >> >> print "Within Set Sum of Squared Error = " + str(WSSSE) >> >> >> >> Which is executed as following: >> >> spark-submit --master $SPARKURL clustering_example.py >> --executor-memory >> >> 32G --driver-memory 60G >> >> >> >> On Sun, Sep 14, 2014 at 10:47 AM, Chengi Liu <chengi.liu...@gmail.com> >> >> wrote: >> >>> >> >>> How? Example please.. >> >>> Also, if I am running this in pyspark shell.. how do i configure >> >>> spark.akka.frameSize ?? >> >>> >> >>> >> >>> On Sun, Sep 14, 2014 at 7:43 AM, Akhil Das < >> ak...@sigmoidanalytics.com> >> >>> wrote: >> >>>> >> >>>> When the data size is huge, you better of use the >> >>>> torrentBroadcastFactory. >> >>>> >> >>>> Thanks >> >>>> Best Regards >> >>>> >> >>>> On Sun, Sep 14, 2014 at 2:54 PM, Chengi Liu <chengi.liu...@gmail.com >> > >> >>>> wrote: >> >>>>> >> >>>>> Specifically the error I see when I try to operate on rdd created by >> >>>>> sc.parallelize method >> >>>>> : org.apache.spark.SparkException: Job aborted due to stage failure: >> >>>>> Serialized task 12:12 was 12062263 bytes which exceeds >> spark.akka.frameSize >> >>>>> (10485760 bytes). Consider using broadcast variables for large >> values. >> >>>>> >> >>>>> On Sun, Sep 14, 2014 at 2:20 AM, Chengi Liu < >> chengi.liu...@gmail.com> >> >>>>> wrote: >> >>>>>> >> >>>>>> Hi, >> >>>>>> I am trying to create an rdd out of large matrix.... >> sc.parallelize >> >>>>>> suggest to use broadcast >> >>>>>> But when I do >> >>>>>> >> >>>>>> sc.broadcast(data) >> >>>>>> I get this error: >> >>>>>> >> >>>>>> Traceback (most recent call last): >> >>>>>> File "<stdin>", line 1, in <module> >> >>>>>> File "/usr/common/usg/spark/1.0.2/python/pyspark/context.py", >> line >> >>>>>> 370, in broadcast >> >>>>>> pickled = pickleSer.dumps(value) >> >>>>>> File "/usr/common/usg/spark/1.0.2/python/pyspark/serializers.py", >> >>>>>> line 279, in dumps >> >>>>>> def dumps(self, obj): return cPickle.dumps(obj, 2) >> >>>>>> SystemError: error return without exception set >> >>>>>> Help? >> >>>>>> >> >>>>> >> >>>> >> >>> >> >> >> > >> > >