Oh, I misleading by the following log info, that I thought the broadcast variable is send back to driver. then the sending result to driver has no relationship with the broadcast variable, but what it is , since there seem no data will send back?
*org.apache.spark.executor.Executor - Serialized size of result for 1901 is 446* *org.apache.spark.executor.Executor - Sending result for 1901 directly to driver* btw, For the 30 times. every time I just run the third or fourth iteration, then spark get stuck, and when I use the TorrentBroadcast, it get stuck in the second iteration , can the blockManager use the disk to store the data when there is no memory ? and is there any good method to see the memory usage in blockManager , or maybe I can read the data from files to avoid such problem. On Mon, Jan 13, 2014 at 2:04 PM, Mosharaf Chowdhury <[email protected] > wrote: > Size calculation is correct, but broadcast happens from the driver to the > workers. > > btw, your code is broadcasting 400MB 30 times, which are not being evicted > from the cache fast enough, which, I think, is causing blockManagers to run > out of memory. > > > On Sun, Jan 12, 2014 at 9:34 PM, lihu <[email protected]> wrote: > >> Yes, I just using the code snippet from the broadcast example, and using >> the spark-shell run this code. >> I thought the broadcast is driver send to the executor, and the executor >> will send back, is there some wrong for calculate the broadcast size? >> >> *val MAX_ITER = 30* >> *val num = 100000000* >> *var arr1 = new Array[Int](num)* >> * for (i <- 0 until arr1.length) {* >> * arr1(i) = i* >> *}* >> *for (i <- 0 until MAX_ITER) {* >> * println("Iteration " + i)* >> * println("===========")* >> * val startTime = System.nanoTime* >> * val barr1 = sc.broadcast(arr1)* >> * sc.parallelize(1 to 10).foreach {* >> * i => println(barr1.value.size)* >> *}* >> * println("Iteration %d took %.0f milliseconds".format(i, >> (System.nanoTime - startTime) / 1E6))* >> * }* >> >> I also try the TorrentBroadcast , it faster than the default, this is >> very helpful, thanks again! >> but I also get stuck during iteration, here is the log info from the >> master, it seem that this is the heartbeat problem. >> >> *[sparkMaster-akka.actor.default-dispatcher-26] WARN >> org.apache.spark.deploy.master.Master - Got heartbeat from unregistered >> worker worker-20140102202153-m023.corp.typingx.me-8139* >> *[sparkMaster-akka.actor.default-dispatcher-26] WARN >> org.apache.spark.deploy.master.Master - Got heartbeat from unregistered >> worker worker-20140102202153-m023.corp.typingx.me-40447* >> >> >> >> >> On Mon, Jan 13, 2014 at 1:01 PM, Mosharaf Chowdhury < >> [email protected]> wrote: >> >>> broadcast is supposed to send data from the driver to the executors and >>> not the other direction. can you share the code snippet you are using to >>> broadcast? >>> >>> -- >>> Mosharaf Chowdhury >>> http://www.mosharaf.com/ >>> >>> >>> On Sun, Jan 12, 2014 at 8:52 PM, lihu <[email protected]> wrote: >>> >>>> In my opinion, the spark system is for big data, then 400M seem not big >>>> . >>>> >>>> I read slides about the broadcast, in my understanding, the executor >>>> will send the broadcast variable back to the driver. each executor own a >>>> complete copy of the broadcast variable. >>>> >>>> In my experiment, I have 20 machine, each machine own 2 executor, and I >>>> used the default parallelize, which is 8, so there 320 tasks in one stage >>>> in total. >>>> >>>> then the workers will send 320*(400M/8)=16G data back to the driver, >>>> this seem very big. but I get from log that after serialize, the data size >>>> send back to driver is just 446 byte in each task. >>>> >>>> *org.apache.spark.storage.BlockManager - Found block broadcast_5 >>>> locally* >>>> *org.apache.spark.executor.Executor - Serialized size of result for >>>> 1901 is 446* >>>> *org.apache.spark.executor.Executor - Sending result for 1901 directly >>>> to driver* >>>> >>>> So the total data send back to driver just 320*446byte=142720byte. this >>>> is really small in my opinion. >>>> >>>> --------------- >>>> In summary >>>> >>>> 1. Spark system is for big data, then 400M is not big in my opinion. >>>> 2. I do not sure if my understanding for the broadcast is right, then >>>> the data send back to the driver may bigger? >>>> 3. I just wonder why the serialize rate is so hight, it can serialize >>>> the 400/8=50M to just 446 byte? >>>> 4. If it is my fault that do not run the broadcast experiment in the >>>> right way, then I I hope the spark community can give more examples about >>>> the broadcast, this may benefit many users. >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Mon, Jan 13, 2014 at 12:22 PM, Aureliano Buendia < >>>> [email protected]> wrote: >>>> >>>>> >>>>> >>>>> >>>>> On Mon, Jan 13, 2014 at 4:17 AM, lihu <[email protected]> wrote: >>>>> >>>>>> I have occurred the same problem with you . >>>>>> I have a node of 20 machines, and I just run the broadcast example, >>>>>> what I do is just change the data size in the example, to 400M, this is >>>>>> really a small data size. >>>>>> >>>>> >>>>> Is 400 MB a really small size for broadcasting? >>>>> >>>>> I had the impression that broadcast is for object much much smaller, >>>>> about less than 10 MB. >>>>> >>>>> >>>>>> but I occurred the same problem with you . >>>>>> *So I wonder maybe the broadcast capacity is weak in the spark >>>>>> system?* >>>>>> >>>>>> >>>>>> here is my config: >>>>>> >>>>>> *SPARK_MEM=12g* >>>>>> *SPARK_MASTER_WEBUI_PORT=12306* >>>>>> *SPARK_WORKER_MEMORY=12g* >>>>>> *SPARK_JAVA_OPTS+="-Dspark.executor.memory=8g >>>>>> -Dspark.akka.timeout=600 -Dspark.local.dir=/disk3/lee/tmp >>>>>> -Dspark.worker.timeout=600 -Dspark.akka.frameSize=10000 >>>>>> -Dspark.akka.askTimeout=300 >>>>>> -Dspark.storage.blockManagerTimeoutIntervalMs=100000 >>>>>> -Dspark.akka.retry.wait=600 -Dspark.blockManagerHeartBeatMs=80000 -Xms15G >>>>>> -Xmx15G -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"* >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Sat, Jan 11, 2014 at 8:27 AM, Khanderao kand < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> If your object size > 10MB you may need to change >>>>>>> spark.akka.frameSize. >>>>>>> >>>>>>> What is your spark, spark.akka.timeOut ? >>>>>>> >>>>>>> did you change spark.akka.heartbeat.interval ? >>>>>>> >>>>>>> BTW based on large size getting broadcasted across 25 nodes, you may >>>>>>> want to consider the frequency of such transfer and evaluate >>>>>>> alternative patterns. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Jan 7, 2014 at 12:55 AM, Sebastian Schelter >>>>>>> <[email protected]>wrote: >>>>>>> >>>>>>>> Spark repeatedly fails broadcast a large object on a cluster of 25 >>>>>>>> machines for me. >>>>>>>> >>>>>>>> I get log messages like this: >>>>>>>> >>>>>>>> [spark-akka.actor.default-dispatcher-4] WARN >>>>>>>> org.apache.spark.storage.BlockManagerMasterActor - Removing >>>>>>>> BlockManager >>>>>>>> BlockManagerId(3, cloud-33.dima.tu-berlin.de, 42185, 0) with no >>>>>>>> recent >>>>>>>> heart beats: 134689ms exceeds 45000ms >>>>>>>> >>>>>>>> Is there something wrong with my config? Do I have to increase some >>>>>>>> timeout? >>>>>>>> >>>>>>>> Thx, >>>>>>>> Sebastian >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>>> >>>> >>> >> >> >> -- >> *Best Wishes!* >> >> *Li Hu(李浒) | Graduate Student* >> >> *Institute for Interdisciplinary Information Sciences(IIIS >> <http://iiis.tsinghua.edu.cn/>)* >> *Tsinghua University, China* >> >> *Email: [email protected] <[email protected]>* >> *Tel : +86 15120081920 <%2B86%2015120081920>* >> *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/ >> <http://iiis.tsinghua.edu.cn/zh/lihu/>* >> >> >> >
