Hi Akhil, So with your config (specifically with set("spark.akka.frameSize ", "10000000")) , I see the error: org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 0:0 was 401970046 bytes which exceeds spark.akka.frameSize (10485760 bytes). Consider using broadcast variables for large values. at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) at org.apache.spark
So, I changed set("spark.akka.frameSize ", "10000000") to set("spark.akka.frameSize ", "10000000*00*") but now I get the same error? y4j.protocol.Py4JJavaError: An error occurred while calling o28.trainKMeansModel. : org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up. at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGSched along with following: 14/09/15 01:44:11 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/09/15 01:44:21 INFO AppClient$ClientActor: Connecting to master spark://host:7077... 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@host:7077] 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@host:7077] 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@host:7077] 14/09/15 01:44:21 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@host:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@host:7077] 14/09/15 01:44:26 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/09/15 01:44:41 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/09/15 01:44:41 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up. :-( On Mon, Sep 15, 2014 at 1:20 AM, Akhil Das <ak...@sigmoidanalytics.com> wrote: > Can you give this a try: > > conf = SparkConf().set("spark.executor.memory", > "32G")*.set("spark.akka.frameSize >> ", >> "10000000").set("spark.broadcast.factory","org.apache.spark.broadcast.TorrentBroadcastFactory")* >> sc = SparkContext(conf = conf) >> rdd = sc.parallelize(matrix,5) >> from pyspark.mllib.clustering import KMeans >> from math import sqrt >> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, >> initializationMode="random") >> def error(point): >> center = clusters.centers[clusters.predict(point)] >> return sqrt(sum([x**2 for x in (point - center)])) >> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y) >> print "Within Set Sum of Squared Error = " + str(WSSSE) > > > Thanks > Best Regards > > On Mon, Sep 15, 2014 at 9:16 AM, Chengi Liu <chengi.liu...@gmail.com> > wrote: > >> And the thing is code runs just fine if I reduce the number of rows in my >> data? >> >> On Sun, Sep 14, 2014 at 8:45 PM, Chengi Liu <chengi.liu...@gmail.com> >> wrote: >> >>> I am using spark1.0.2. >>> This is my work cluster.. so I can't setup a new version readily... >>> But right now, I am not using broadcast .. >>> >>> >>> conf = SparkConf().set("spark.executor.memory", >>> "32G").set("spark.akka.frameSize", "1000") >>> sc = SparkContext(conf = conf) >>> rdd = sc.parallelize(matrix,5) >>> >>> from pyspark.mllib.clustering import KMeans >>> from math import sqrt >>> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, >>> initializationMode="random") >>> def error(point): >>> center = clusters.centers[clusters.predict(point)] >>> return sqrt(sum([x**2 for x in (point - center)])) >>> >>> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + y) >>> print "Within Set Sum of Squared Error = " + str(WSSSE) >>> >>> >>> executed by >>> spark-submit --master $SPARKURL clustering_example.py --executor-memory >>> 32G --driver-memory 60G >>> >>> and the error I see >>> py4j.protocol.Py4JJavaError: An error occurred while calling >>> o26.trainKMeansModel. >>> : org.apache.spark.SparkException: Job aborted due to stage failure: All >>> masters are unresponsive! Giving up. >>> at org.apache.spark.scheduler.DAGScheduler.org >>> <http://org.apache.spark.scheduler.dagscheduler.org/> >>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >>> at scala.Option.foreach(Option.scala:236) >>> at >>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) >>> at >>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >>> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >>> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >>> at >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> >>> >>> and >>> 14/09/14 20:43:30 WARN AppClient$ClientActor: Could not connect to >>> akka.tcp://sparkMaster@hostname:7077: >>> akka.remote.EndpointAssociationException: Association failed with >>> [akka.tcp://sparkMaster@ hostname:7077] >>> >>> ?? >>> Any suggestions?? >>> >>> >>> On Sun, Sep 14, 2014 at 8:39 PM, Davies Liu <dav...@databricks.com> >>> wrote: >>> >>>> Hey Chengi, >>>> >>>> What's the version of Spark you are using? It have big improvements >>>> about broadcast in 1.1, could you try it? >>>> >>>> On Sun, Sep 14, 2014 at 8:29 PM, Chengi Liu <chengi.liu...@gmail.com> >>>> wrote: >>>> > Any suggestions.. I am really blocked on this one >>>> > >>>> > On Sun, Sep 14, 2014 at 2:43 PM, Chengi Liu <chengi.liu...@gmail.com> >>>> wrote: >>>> >> >>>> >> And when I use sparksubmit script, I get the following error: >>>> >> >>>> >> py4j.protocol.Py4JJavaError: An error occurred while calling >>>> >> o26.trainKMeansModel. >>>> >> : org.apache.spark.SparkException: Job aborted due to stage failure: >>>> All >>>> >> masters are unresponsive! Giving up. >>>> >> at >>>> >> org.apache.spark.scheduler.DAGScheduler.org >>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) >>>> >> at >>>> >> >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) >>>> >> at >>>> >> >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) >>>> >> at >>>> >> >>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>> >> at >>>> >> >>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) >>>> >> at >>>> >> >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >>>> >> at >>>> >> >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) >>>> >> at scala.Option.foreach(Option.scala:236) >>>> >> at >>>> >> >>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) >>>> >> at >>>> >> >>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) >>>> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >>>> >> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >>>> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >>>> >> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >>>> >> at >>>> >> >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>>> >> at >>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> >> at >>>> >> >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> >> at >>>> >> >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> >> at >>>> >> >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> >> >>>> >> >>>> >> My spark submit code is >>>> >> >>>> >> conf = SparkConf().set("spark.executor.memory", >>>> >> "32G").set("spark.akka.frameSize", "1000") >>>> >> sc = SparkContext(conf = conf) >>>> >> rdd = sc.parallelize(matrix,5) >>>> >> >>>> >> from pyspark.mllib.clustering import KMeans >>>> >> from math import sqrt >>>> >> clusters = KMeans.train(rdd, 5, maxIterations=2,runs=2, >>>> >> initializationMode="random") >>>> >> def error(point): >>>> >> center = clusters.centers[clusters.predict(point)] >>>> >> return sqrt(sum([x**2 for x in (point - center)])) >>>> >> >>>> >> WSSSE = rdd.map(lambda point: error(point)).reduce(lambda x, y: x + >>>> y) >>>> >> print "Within Set Sum of Squared Error = " + str(WSSSE) >>>> >> >>>> >> Which is executed as following: >>>> >> spark-submit --master $SPARKURL clustering_example.py >>>> --executor-memory >>>> >> 32G --driver-memory 60G >>>> >> >>>> >> On Sun, Sep 14, 2014 at 10:47 AM, Chengi Liu < >>>> chengi.liu...@gmail.com> >>>> >> wrote: >>>> >>> >>>> >>> How? Example please.. >>>> >>> Also, if I am running this in pyspark shell.. how do i configure >>>> >>> spark.akka.frameSize ?? >>>> >>> >>>> >>> >>>> >>> On Sun, Sep 14, 2014 at 7:43 AM, Akhil Das < >>>> ak...@sigmoidanalytics.com> >>>> >>> wrote: >>>> >>>> >>>> >>>> When the data size is huge, you better of use the >>>> >>>> torrentBroadcastFactory. >>>> >>>> >>>> >>>> Thanks >>>> >>>> Best Regards >>>> >>>> >>>> >>>> On Sun, Sep 14, 2014 at 2:54 PM, Chengi Liu < >>>> chengi.liu...@gmail.com> >>>> >>>> wrote: >>>> >>>>> >>>> >>>>> Specifically the error I see when I try to operate on rdd created >>>> by >>>> >>>>> sc.parallelize method >>>> >>>>> : org.apache.spark.SparkException: Job aborted due to stage >>>> failure: >>>> >>>>> Serialized task 12:12 was 12062263 bytes which exceeds >>>> spark.akka.frameSize >>>> >>>>> (10485760 bytes). Consider using broadcast variables for large >>>> values. >>>> >>>>> >>>> >>>>> On Sun, Sep 14, 2014 at 2:20 AM, Chengi Liu < >>>> chengi.liu...@gmail.com> >>>> >>>>> wrote: >>>> >>>>>> >>>> >>>>>> Hi, >>>> >>>>>> I am trying to create an rdd out of large matrix.... >>>> sc.parallelize >>>> >>>>>> suggest to use broadcast >>>> >>>>>> But when I do >>>> >>>>>> >>>> >>>>>> sc.broadcast(data) >>>> >>>>>> I get this error: >>>> >>>>>> >>>> >>>>>> Traceback (most recent call last): >>>> >>>>>> File "<stdin>", line 1, in <module> >>>> >>>>>> File "/usr/common/usg/spark/1.0.2/python/pyspark/context.py", >>>> line >>>> >>>>>> 370, in broadcast >>>> >>>>>> pickled = pickleSer.dumps(value) >>>> >>>>>> File >>>> "/usr/common/usg/spark/1.0.2/python/pyspark/serializers.py", >>>> >>>>>> line 279, in dumps >>>> >>>>>> def dumps(self, obj): return cPickle.dumps(obj, 2) >>>> >>>>>> SystemError: error return without exception set >>>> >>>>>> Help? >>>> >>>>>> >>>> >>>>> >>>> >>>> >>>> >>> >>>> >> >>>> > >>>> >>> >>> >> >