Hi, Thanks for you information. I'll give spark1.4 a try when it's released.
On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das <t...@databricks.com> wrote: > Could you try it out with Spark 1.4 RC3? > > Also pinging, Cloudera folks, they may be aware of something. > > BTW, the way I have debugged memory leaks in the past is as follows. > > Run with a small driver memory, say 1 GB. Periodically (maybe a script), > take snapshots of histogram and also do memory dumps. Say every hour. And > then compare the difference between two histo/dumps that are few hours > separated (more the better). Diffing histo is easy. Diff two dumps can be > done in JVisualVM, it will show the diff in the objects that got added in > the later dump. That makes it easy to debug what is not getting cleaned. > > TD > > > On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG <zhangj...@gmail.com> wrote: > >> Hi, >> >> Thanks for you reply. Here's the top 30 entries of jmap -histo:live >> result: >> >> num #instances #bytes class name >> ---------------------------------------------- >> 1: 40802 145083848 [B >> 2: 99264 12716112 <methodKlass> >> 3: 99264 12291480 <constMethodKlass> >> 4: 8472 9144816 <constantPoolKlass> >> 5: 8472 7625192 <instanceKlassKlass> >> 6: 186 6097824 >> [Lscala.concurrent.forkjoin.ForkJoinTask; >> 7: 7045 4804832 <constantPoolCacheKlass> >> 8: 139168 4453376 java.util.HashMap$Entry >> 9: 9427 3542512 <methodDataKlass> >> 10: 141312 3391488 >> io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry >> 11: 135491 3251784 java.lang.Long >> 12: 26192 2765496 [C >> 13: 813 1140560 [Ljava.util.HashMap$Entry; >> 14: 8997 1061936 java.lang.Class >> 15: 16022 851384 [[I >> 16: 16447 789456 java.util.zip.Inflater >> 17: 13855 723376 [S >> 18: 17282 691280 java.lang.ref.Finalizer >> 19: 25725 617400 java.lang.String >> 20: 320 570368 >> [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry; >> 21: 16066 514112 >> java.util.concurrent.ConcurrentHashMap$HashEntry >> 22: 12288 491520 >> org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment >> 23: 13343 426976 >> java.util.concurrent.locks.ReentrantLock$NonfairSync >> 24: 12288 396416 >> [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry; >> 25: 16447 394728 java.util.zip.ZStreamRef >> 26: 565 370080 [I >> 27: 508 272288 <objArrayKlassKlass> >> 28: 16233 259728 java.lang.Object >> 29: 771 209232 >> [Ljava.util.concurrent.ConcurrentHashMap$HashEntry; >> 30: 2524 192312 [Ljava.lang.Object; >> >> But as I mentioned above, the heap memory seems OK, the extra memory is >> consumed by some off-heap data. I can't find a way to figure out what is in >> there. >> >> Besides, I did some extra experiments, i.e. run the same program in >> difference environments to test whether it has off-heap memory issue: >> >> spark1.0 + standalone = no >> spark1.0 + yarn = no >> spark1.3 + standalone = no >> spark1.3 + yarn = yes >> >> I'm using CDH5.1, so the spark1.0 is provided by cdh, and >> spark-1.3.1-bin-hadoop2.3 is downloaded from the official website. >> >> I could use spark1.0 + yarn, but I can't find a way to handle the logs, >> level and rolling, so it'll explode the harddrive. >> >> Currently I'll stick to spark1.0 + standalone, until our ops team decides >> to upgrade cdh. >> >> >> >> On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das <t...@databricks.com> >> wrote: >> >>> While you are running is it possible for you login into the YARN node >>> and get histograms of live objects using "jmap -histo:live". That may >>> reveal something. >>> >>> >>> On Thursday, May 28, 2015, Ji ZHANG <zhangj...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> Unfortunately, they're still growing, both driver and executors. >>>> >>>> I run the same job with local mode, everything is fine. >>>> >>>> On Thu, May 28, 2015 at 5:26 PM, Akhil Das <ak...@sigmoidanalytics.com> >>>> wrote: >>>> >>>>> Can you replace your counting part with this? >>>>> >>>>> logs.filter(_.s_id > 0).foreachRDD(rdd => logger.info(rdd.count())) >>>>> >>>>> >>>>> >>>>> Thanks >>>>> Best Regards >>>>> >>>>> On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG <zhangj...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I wrote a simple test job, it only does very basic operations. for >>>>>> example: >>>>>> >>>>>> val lines = KafkaUtils.createStream(ssc, zkQuorum, group, >>>>>> Map(topic -> 1)).map(_._2) >>>>>> val logs = lines.flatMap { line => >>>>>> try { >>>>>> Some(parse(line).extract[Impression]) >>>>>> } catch { >>>>>> case _: Exception => None >>>>>> } >>>>>> } >>>>>> >>>>>> logs.filter(_.s_id > 0).count.foreachRDD { rdd => >>>>>> rdd.foreachPartition { iter => >>>>>> iter.foreach(count => logger.info(count.toString)) >>>>>> } >>>>>> } >>>>>> >>>>>> It receives messages from Kafka, parse the json, filter and count the >>>>>> records, and then print it to logs. >>>>>> >>>>>> Thanks. >>>>>> >>>>>> >>>>>> On Thu, May 28, 2015 at 3:07 PM, Akhil Das < >>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>> >>>>>>> Hi Zhang, >>>>>>> >>>>>>> Could you paste your code in a gist? Not sure what you are doing >>>>>>> inside the code to fill up memory. >>>>>>> >>>>>>> Thanks >>>>>>> Best Regards >>>>>>> >>>>>>> On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG <zhangj...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Yes, I'm using createStream, but the storageLevel param is by >>>>>>>> default MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also >>>>>>>> growing. I don't think Kafka messages will be cached in driver. >>>>>>>> >>>>>>>> >>>>>>>> On Thu, May 28, 2015 at 12:24 AM, Akhil Das < >>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>> >>>>>>>>> Are you using the createStream or createDirectStream api? If its >>>>>>>>> the former, you can try setting the StorageLevel to MEMORY_AND_DISK >>>>>>>>> (it >>>>>>>>> might slow things down though). Another way would be to try the later >>>>>>>>> one. >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Best Regards >>>>>>>>> >>>>>>>>> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG <zhangj...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Akhil, >>>>>>>>>> >>>>>>>>>> Thanks for your reply. Accoding to the Streaming tab of Web UI, >>>>>>>>>> the Processing Time is around 400ms, and there's no Scheduling >>>>>>>>>> Delay, so I >>>>>>>>>> suppose it's not the Kafka messages that eat up the off-heap memory. >>>>>>>>>> Or >>>>>>>>>> maybe it is, but how to tell? >>>>>>>>>> >>>>>>>>>> I googled about how to check the off-heap memory usage, there's a >>>>>>>>>> tool called pmap, but I don't know how to interprete the results. >>>>>>>>>> >>>>>>>>>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das < >>>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>>> >>>>>>>>>>> After submitting the job, if you do a ps aux | grep spark-submit >>>>>>>>>>> then you can see all JVM params. Are you using the highlevel >>>>>>>>>>> consumer >>>>>>>>>>> (receiver based) for receiving data from Kafka? In that case if your >>>>>>>>>>> throughput is high and the processing delay exceeds batch interval >>>>>>>>>>> then you >>>>>>>>>>> will hit this memory issues as the data will keep on receiving and >>>>>>>>>>> is >>>>>>>>>>> dumped to memory. You can set StorageLevel to MEMORY_AND_DISK (but >>>>>>>>>>> it slows >>>>>>>>>>> things down). Another alternate will be to use the lowlevel >>>>>>>>>>> kafka consumer >>>>>>>>>>> <https://github.com/dibbhatt/kafka-spark-consumer> or to use >>>>>>>>>>> the non-receiver based directStream >>>>>>>>>>> <https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers> >>>>>>>>>>> that comes up with spark. >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Best Regards >>>>>>>>>>> >>>>>>>>>>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG <zhangj...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. >>>>>>>>>>>> I find out that YARN is killing the driver and executor process >>>>>>>>>>>> because of >>>>>>>>>>>> excessive use of memory. Here's something I tried: >>>>>>>>>>>> >>>>>>>>>>>> 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), >>>>>>>>>>>> so the extra memory is not used by heap. >>>>>>>>>>>> 2. I set the two memoryOverhead params to 1024 (default is >>>>>>>>>>>> 384), but the memory just keeps growing and then hits the limit. >>>>>>>>>>>> 3. This problem is not shown in low-throughput jobs, neither in >>>>>>>>>>>> standalone mode. >>>>>>>>>>>> 4. The test job just receives messages from Kafka, with batch >>>>>>>>>>>> interval of 1, do some filtering and aggregation, and then print to >>>>>>>>>>>> executor logs. So it's not some 3rd party library that causes the >>>>>>>>>>>> 'leak'. >>>>>>>>>>>> >>>>>>>>>>>> Spark 1.3 is built by myself, with correct hadoop versions. >>>>>>>>>>>> >>>>>>>>>>>> Any ideas will be appreciated. >>>>>>>>>>>> >>>>>>>>>>>> Thanks. >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Jerry >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Jerry >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Jerry >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Jerry >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Jerry >>>> >>> >> >> >> -- >> Jerry >> > > -- Jerry