Hi, I think a broadcast logic itself works well in spite of input size (no idea about its efficiency).
How about your memory size in a driver? when you broadcast some large variables, a driver eats much memory for splitting the variable into blocks and their serializations. Thanks, maropu On Tue, Mar 8, 2016 at 9:30 AM, Arash <aras...@gmail.com> wrote: > Hi Ankur, > > For this specific test, I'm only running the few lines of code that are > pasted. Nothing else is cached in the cluster. > > Thanks, > Arash > > > On Mon, Mar 7, 2016 at 4:07 PM, Ankur Srivastava < > ankur.srivast...@gmail.com> wrote: > >> Hi, >> >> We have a use case where we broadcast ~4GB of data and we are on >> m3.2xlarge so your object size is not an issue. Also based on your >> explanation does not look like a broadcast issue as it works when your >> partition size is small. >> >> Are you caching any other data? Because boradcast variable use the cache >> memory. >> >> Thanks >> Ankur >> >> On Mon, Mar 7, 2016 at 3:34 PM, Jeff Zhang <zjf...@gmail.com> wrote: >> >>> Any reason why do you broadcast such large variable ? It doesn't make >>> sense to me >>> >>> On Tue, Mar 8, 2016 at 7:29 AM, Arash <aras...@gmail.com> wrote: >>> >>>> Hello all, >>>> >>>> I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes >>>> but haven't been able to make it work so far. >>>> >>>> It looks like the executors start to run out of memory during >>>> deserialization. This behavior only shows itself when the number of >>>> partitions is above a few 10s, the broadcast does work for 10 or 20 >>>> partitions. >>>> >>>> I'm using the following setup to observe the problem: >>>> >>>> val tuples: Array[((String, String), (String, String))] // ~ 10M >>>> tuples >>>> val tuplesBc = sc.broadcast(tuples) >>>> val numsRdd = sc.parallelize(1 to 5000, 100) >>>> numsRdd.map(n => tuplesBc.value.head).count() >>>> >>>> If I set the number of partitions for numsRDD to 20, the count goes >>>> through successfully, but at 100, I'll start to get errors such as: >>>> >>>> 16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in >>>> stage 1.0 (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java >>>> heap space >>>> at >>>> java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472) >>>> at >>>> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789) >>>> at >>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at >>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>>> at >>>> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>> at >>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) >>>> at >>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>> at >>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at >>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >>>> at >>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>> at >>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at >>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >>>> at >>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>> at >>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at >>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >>>> at >>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>> at >>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at >>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >>>> at >>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >>>> at >>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>>> at >>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>>> at >>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >>>> at >>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >>>> >>>> >>>> I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark >>>> property maximizeResourceAllocation is set to true (executor.memory = 48G >>>> according to spark ui environment). We're also using kryo serialization and >>>> Yarn is the resource manager. >>>> >>>> Any ideas as what might be going wrong and how to debug this? >>>> >>>> Thanks, >>>> Arash >>>> >>>> >>> >>> >>> -- >>> Best Regards >>> >>> Jeff Zhang >>> >> >> > -- --- Takeshi Yamamuro