Hi,

I think a broadcast logic itself works well in spite of input size (no idea
about its efficiency).

How about your memory size in a driver?
when you broadcast some large variables, a driver eats much memory for
splitting the variable into blocks and their serializations.

Thanks,
maropu

On Tue, Mar 8, 2016 at 9:30 AM, Arash <aras...@gmail.com> wrote:

> Hi Ankur,
>
> For this specific test, I'm only running the few lines of code that are
> pasted. Nothing else is cached in the cluster.
>
> Thanks,
> Arash
>
>
> On Mon, Mar 7, 2016 at 4:07 PM, Ankur Srivastava <
> ankur.srivast...@gmail.com> wrote:
>
>> Hi,
>>
>> We have a use case where we broadcast ~4GB of data and we are on
>> m3.2xlarge so your object size is not an issue. Also based on your
>> explanation does not look like a broadcast issue as it works when your
>> partition size is small.
>>
>> Are you caching any other data? Because boradcast variable use the cache
>> memory.
>>
>> Thanks
>> Ankur
>>
>> On Mon, Mar 7, 2016 at 3:34 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>>
>>> Any reason why do you broadcast such large variable ? It doesn't make
>>> sense to me
>>>
>>> On Tue, Mar 8, 2016 at 7:29 AM, Arash <aras...@gmail.com> wrote:
>>>
>>>> Hello all,
>>>>
>>>> I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes
>>>> but haven't been able to make it work so far.
>>>>
>>>> It looks like the executors start to run out of memory during
>>>> deserialization. This behavior only shows itself when the number of
>>>> partitions is above a few 10s, the broadcast does work for 10 or 20
>>>> partitions.
>>>>
>>>> I'm using the following setup to observe the problem:
>>>>
>>>> val tuples: Array[((String, String), (String, String))]      // ~ 10M
>>>> tuples
>>>> val tuplesBc = sc.broadcast(tuples)
>>>> val numsRdd = sc.parallelize(1 to 5000, 100)
>>>> numsRdd.map(n => tuplesBc.value.head).count()
>>>>
>>>> If I set the number of partitions for numsRDD to 20, the count goes
>>>> through successfully, but at 100, I'll start to get errors such as:
>>>>
>>>> 16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in
>>>> stage 1.0 (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java
>>>> heap space
>>>>         at
>>>> java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472)
>>>>         at
>>>> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278)
>>>>         at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
>>>>         at
>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>         at
>>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>>         at
>>>> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>         at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>>>         at
>>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>>>>         at
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
>>>>         at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>         at
>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>         at
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>>         at
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>>         at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>         at
>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>         at
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>>         at
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>>         at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>         at
>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>         at
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>>         at
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>>         at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>         at
>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>         at
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>>         at
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>>         at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>         at
>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>         at
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>>         at
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>>
>>>>
>>>> I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark
>>>> property maximizeResourceAllocation is set to true (executor.memory = 48G
>>>> according to spark ui environment). We're also using kryo serialization and
>>>> Yarn is the resource manager.
>>>>
>>>> Any ideas as what might be going wrong and how to debug this?
>>>>
>>>> Thanks,
>>>> Arash
>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>>
>


-- 
---
Takeshi Yamamuro

Reply via email to