I am using spark1.0.2. This is my work cluster.. so I can't setup a new version readily... But right now, I am not using broadcast ..
conf = SparkConf().set("spark.executor.memory", "32G").set("spark.akka.frameSize", "1000") sc = SparkContext(conf = conf) rdd = sc.parallelize(matrix,5) from pyspark.mllib.clustering import KMeans from math import sqrt clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, initializationMode="random") def error(point): center = clusters.centers[clusters.predict(point)] return sqrt(sum([x**2 for x in (point - center)])) WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y) print "Within Set Sum of Squared Error = " + str(WSSSE) executed by spark-submit --master $SPARKURL clustering_example.py --executor-memory 32G --driver-memory 60G and the error I see py4j.protocol.Py4JJavaError: An error occurred while calling o26.trainKMeansModel. : org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up. at org.apache.spark.scheduler.DAGScheduler.org <http://org.apache.spark.scheduler.dagscheduler.org/> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) and 14/09/14 20:43:30 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@hostname:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@ hostname:7077] ?? Any suggestions?? On Sun, Sep 14, 2014 at 8:39 PM, Davies Liu <dav...@databricks.com> wrote: > Hey Chengi, > > What's the version of Spark you are using? It have big improvements > about broadcast in 1.1, could you try it? > > On Sun, Sep 14, 2014 at 8:29 PM, Chengi Liu <chengi.liu...@gmail.com> > wrote: > > Any suggestions.. I am really blocked on this one > > > > On Sun, Sep 14, 2014 at 2:43 PM, Chengi Liu <chengi.liu...@gmail.com> > wrote: > >> > >> And when I use sparksubmit script, I get the following error: > >> > >> py4j.protocol.Py4JJavaError: An error occurred while calling > >> o26.trainKMeansModel. > >> : org.apache.spark.SparkException: Job aborted due to stage failure: All > >> masters are unresponsive! Giving up. > >> at > >> org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) > >> at > >> > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) > >> at > >> > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) > >> at > >> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > >> at > >> > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) > >> at > >> > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) > >> at > >> > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) > >> at scala.Option.foreach(Option.scala:236) > >> at > >> > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) > >> at > >> > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) > >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > >> at akka.actor.ActorCell.invoke(ActorCell.scala:456) > >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > >> at akka.dispatch.Mailbox.run(Mailbox.scala:219) > >> at > >> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > >> at > >> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > >> at > >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > >> at > >> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >> > >> > >> My spark submit code is > >> > >> conf = SparkConf().set("spark.executor.memory", > >> "32G").set("spark.akka.frameSize", "1000") > >> sc = SparkContext(conf = conf) > >> rdd = sc.parallelize(matrix,5) > >> > >> from pyspark.mllib.clustering import KMeans > >> from math import sqrt > >> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, > >> initializationMode="random") > >> def error(point): > >> center = clusters.centers[clusters.predict(point)] > >> return sqrt(sum([x**2 for x in (point - center)])) > >> > >> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y) > >> print "Within Set Sum of Squared Error = " + str(WSSSE) > >> > >> Which is executed as following: > >> spark-submit --master $SPARKURL clustering_example.py --executor-memory > >> 32G --driver-memory 60G > >> > >> On Sun, Sep 14, 2014 at 10:47 AM, Chengi Liu <chengi.liu...@gmail.com> > >> wrote: > >>> > >>> How? Example please.. > >>> Also, if I am running this in pyspark shell.. how do i configure > >>> spark.akka.frameSize ?? > >>> > >>> > >>> On Sun, Sep 14, 2014 at 7:43 AM, Akhil Das <ak...@sigmoidanalytics.com > > > >>> wrote: > >>>> > >>>> When the data size is huge, you better of use the > >>>> torrentBroadcastFactory. > >>>> > >>>> Thanks > >>>> Best Regards > >>>> > >>>> On Sun, Sep 14, 2014 at 2:54 PM, Chengi Liu <chengi.liu...@gmail.com> > >>>> wrote: > >>>>> > >>>>> Specifically the error I see when I try to operate on rdd created by > >>>>> sc.parallelize method > >>>>> : org.apache.spark.SparkException: Job aborted due to stage failure: > >>>>> Serialized task 12:12 was 12062263 bytes which exceeds > spark.akka.frameSize > >>>>> (10485760 bytes). Consider using broadcast variables for large > values. > >>>>> > >>>>> On Sun, Sep 14, 2014 at 2:20 AM, Chengi Liu <chengi.liu...@gmail.com > > > >>>>> wrote: > >>>>>> > >>>>>> Hi, > >>>>>> I am trying to create an rdd out of large matrix.... > sc.parallelize > >>>>>> suggest to use broadcast > >>>>>> But when I do > >>>>>> > >>>>>> sc.broadcast(data) > >>>>>> I get this error: > >>>>>> > >>>>>> Traceback (most recent call last): > >>>>>> File "<stdin>", line 1, in <module> > >>>>>> File "/usr/common/usg/spark/1.0.2/python/pyspark/context.py", line > >>>>>> 370, in broadcast > >>>>>> pickled = pickleSer.dumps(value) > >>>>>> File "/usr/common/usg/spark/1.0.2/python/pyspark/serializers.py", > >>>>>> line 279, in dumps > >>>>>> def dumps(self, obj): return cPickle.dumps(obj, 2) > >>>>>> SystemError: error return without exception set > >>>>>> Help? > >>>>>> > >>>>> > >>>> > >>> > >> > > >