Oh, I misleading by the following log info, that I thought the  broadcast
variable is send back to driver. then the sending result to driver has no
relationship with the broadcast variable, but what it is , since there seem
no data will send back?

*org.apache.spark.executor.Executor - Serialized size of result for 1901 is
446*
*org.apache.spark.executor.Executor - Sending result for 1901 directly to
driver*


btw, For the 30 times. every time I just run the third or fourth iteration,
then spark get stuck, and when I use the TorrentBroadcast, it get stuck in
the second iteration , can the blockManager use the disk to store the data
when there is no  memory ? and is there any good method to see the memory
usage in  blockManager , or  maybe I can read the data from files to avoid
such problem.




On Mon, Jan 13, 2014 at 2:04 PM, Mosharaf Chowdhury <[email protected]
> wrote:

> Size calculation is correct, but broadcast happens from the driver to the
> workers.
>
> btw, your code is broadcasting 400MB 30 times, which are not being evicted
> from the cache fast enough, which, I think, is causing blockManagers to run
> out of memory.
>
>
> On Sun, Jan 12, 2014 at 9:34 PM, lihu <[email protected]> wrote:
>
>> Yes, I just using the code snippet from the broadcast example, and using
>> the spark-shell run this code.
>> I thought the broadcast is driver send to the executor, and the executor
>> will send back,  is there some wrong for  calculate the broadcast size?
>>
>> *val MAX_ITER = 30*
>> *val num = 100000000*
>> *var arr1 = new Array[Int](num)*
>> *    for (i <- 0 until arr1.length) {*
>> *      arr1(i) = i*
>> *}*
>> *for (i <- 0 until MAX_ITER) {*
>> *      println("Iteration " + i)*
>> *      println("===========")*
>> *      val startTime = System.nanoTime*
>> *      val barr1 = sc.broadcast(arr1)*
>> *      sc.parallelize(1 to 10).foreach {*
>> *        i => println(barr1.value.size)*
>> *}*
>> *   println("Iteration %d took %.0f milliseconds".format(i,
>> (System.nanoTime - startTime) / 1E6))*
>> * }*
>>
>>  I also try the TorrentBroadcast , it faster than the default, this is
>> very helpful, thanks again!
>> but I also get stuck during iteration, here is the log info from the
>> master, it seem that this is the heartbeat problem.
>>
>> *[sparkMaster-akka.actor.default-dispatcher-26] WARN
>>  org.apache.spark.deploy.master.Master - Got heartbeat from unregistered
>> worker worker-20140102202153-m023.corp.typingx.me-8139*
>> *[sparkMaster-akka.actor.default-dispatcher-26] WARN
>>  org.apache.spark.deploy.master.Master - Got heartbeat from unregistered
>> worker worker-20140102202153-m023.corp.typingx.me-40447*
>>
>>
>>
>>
>> On Mon, Jan 13, 2014 at 1:01 PM, Mosharaf Chowdhury <
>> [email protected]> wrote:
>>
>>> broadcast is supposed to send data from the driver to the executors and
>>> not the other direction. can you share the code snippet you are using to
>>> broadcast?
>>>
>>> --
>>> Mosharaf Chowdhury
>>> http://www.mosharaf.com/
>>>
>>>
>>> On Sun, Jan 12, 2014 at 8:52 PM, lihu <[email protected]> wrote:
>>>
>>>> In my opinion, the spark system is for big data, then 400M seem not big
>>>> .
>>>>
>>>> I read slides about the broadcast, in my understanding, the executor
>>>> will send the broadcast variable back to the driver. each executor own a
>>>> complete copy of the broadcast variable.
>>>>
>>>> In my experiment, I have 20 machine, each machine own 2 executor, and I
>>>> used the default parallelize, which is 8, so there  320  tasks in one stage
>>>> in total.
>>>>
>>>> then the workers will send 320*(400M/8)=16G data back to the driver,
>>>> this seem very big. but I get from log that after serialize, the data size
>>>> send back to driver is just 446 byte in each task.
>>>>
>>>> *org.apache.spark.storage.BlockManager - Found block broadcast_5
>>>> locally*
>>>> *org.apache.spark.executor.Executor - Serialized size of result for
>>>> 1901 is 446*
>>>> *org.apache.spark.executor.Executor - Sending result for 1901 directly
>>>> to driver*
>>>>
>>>> So the total data send back to driver just 320*446byte=142720byte. this
>>>> is really small in my opinion.
>>>>
>>>> ---------------
>>>> In summary
>>>>
>>>> 1.  Spark system is for big data, then 400M is not big in my opinion.
>>>> 2.  I do not sure if my understanding for the broadcast is right, then
>>>> the data send back to the driver may bigger?
>>>> 3.  I just wonder why the serialize rate is so hight, it can serialize
>>>> the 400/8=50M to just 446 byte?
>>>> 4.  If it is my fault that do not run the broadcast experiment in the
>>>> right way,  then I I hope the spark community can give more examples about
>>>> the broadcast, this may benefit many users.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Jan 13, 2014 at 12:22 PM, Aureliano Buendia <
>>>> [email protected]> wrote:
>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Jan 13, 2014 at 4:17 AM, lihu <[email protected]> wrote:
>>>>>
>>>>>> I have occurred the same problem with you .
>>>>>> I have a node of 20 machines, and I just run the broadcast example,
>>>>>> what I do is just change the data size in the example, to 400M, this is
>>>>>> really a small data size.
>>>>>>
>>>>>
>>>>> Is 400 MB a really small size for broadcasting?
>>>>>
>>>>> I had the impression that broadcast is for object much much smaller,
>>>>> about less than 10 MB.
>>>>>
>>>>>
>>>>>> but I occurred the same problem with you .
>>>>>> *So I wonder maybe the broadcast capacity is weak in the spark
>>>>>> system?*
>>>>>>
>>>>>>
>>>>>> here is my config:
>>>>>>
>>>>>> *SPARK_MEM=12g*
>>>>>> *SPARK_MASTER_WEBUI_PORT=12306*
>>>>>> *SPARK_WORKER_MEMORY=12g*
>>>>>> *SPARK_JAVA_OPTS+="-Dspark.executor.memory=8g
>>>>>> -Dspark.akka.timeout=600  -Dspark.local.dir=/disk3/lee/tmp
>>>>>> -Dspark.worker.timeout=600 -Dspark.akka.frameSize=10000
>>>>>> -Dspark.akka.askTimeout=300
>>>>>> -Dspark.storage.blockManagerTimeoutIntervalMs=100000
>>>>>> -Dspark.akka.retry.wait=600 -Dspark.blockManagerHeartBeatMs=80000 -Xms15G
>>>>>> -Xmx15G -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"*
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sat, Jan 11, 2014 at 8:27 AM, Khanderao kand <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> If your object size > 10MB you may need to change
>>>>>>> spark.akka.frameSize.
>>>>>>>
>>>>>>> What is your spark, spark.akka.timeOut ?
>>>>>>>
>>>>>>> did you change   spark.akka.heartbeat.interval  ?
>>>>>>>
>>>>>>> BTW based on large size getting broadcasted across 25 nodes, you may 
>>>>>>> want to consider the frequency of such transfer and evaluate 
>>>>>>> alternative patterns.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jan 7, 2014 at 12:55 AM, Sebastian Schelter 
>>>>>>> <[email protected]>wrote:
>>>>>>>
>>>>>>>> Spark repeatedly fails broadcast a large object on a cluster of 25
>>>>>>>> machines for me.
>>>>>>>>
>>>>>>>> I get log messages like this:
>>>>>>>>
>>>>>>>> [spark-akka.actor.default-dispatcher-4] WARN
>>>>>>>> org.apache.spark.storage.BlockManagerMasterActor - Removing
>>>>>>>> BlockManager
>>>>>>>> BlockManagerId(3, cloud-33.dima.tu-berlin.de, 42185, 0) with no
>>>>>>>> recent
>>>>>>>> heart beats: 134689ms exceeds 45000ms
>>>>>>>>
>>>>>>>> Is there something wrong with my config? Do I have to increase some
>>>>>>>> timeout?
>>>>>>>>
>>>>>>>> Thx,
>>>>>>>> Sebastian
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> *Best Wishes!*
>>
>> *Li Hu(李浒) | Graduate Student*
>>
>> *Institute for Interdisciplinary Information Sciences(IIIS
>> <http://iiis.tsinghua.edu.cn/>)*
>> *Tsinghua University, China*
>>
>> *Email: [email protected] <[email protected]>*
>> *Tel  : +86 15120081920 <%2B86%2015120081920>*
>> *Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/
>> <http://iiis.tsinghua.edu.cn/zh/lihu/>*
>>
>>
>>
>

Reply via email to