Hi,

Thanks for you information. I'll give spark1.4 a try when it's released.

On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das <t...@databricks.com> wrote:

> Could you try it out with Spark 1.4 RC3?
>
> Also pinging, Cloudera folks, they may be aware of something.
>
> BTW, the way I have debugged memory leaks in the past is as follows.
>
> Run with a small driver memory, say 1 GB. Periodically (maybe a script),
> take snapshots of histogram and also do memory dumps. Say every hour. And
> then compare the difference between two histo/dumps that are few hours
> separated (more the better). Diffing histo is easy. Diff two dumps can be
> done in JVisualVM, it will show the diff in the objects that got added in
> the later dump. That makes it easy to debug what is not getting cleaned.
>
> TD
>
>
> On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG <zhangj...@gmail.com> wrote:
>
>> Hi,
>>
>> Thanks for you reply. Here's the top 30 entries of jmap -histo:live
>> result:
>>
>>  num     #instances         #bytes  class name
>> ----------------------------------------------
>>    1:         40802      145083848  [B
>>    2:         99264       12716112  <methodKlass>
>>    3:         99264       12291480  <constMethodKlass>
>>    4:          8472        9144816  <constantPoolKlass>
>>    5:          8472        7625192  <instanceKlassKlass>
>>    6:           186        6097824
>>  [Lscala.concurrent.forkjoin.ForkJoinTask;
>>    7:          7045        4804832  <constantPoolCacheKlass>
>>    8:        139168        4453376  java.util.HashMap$Entry
>>    9:          9427        3542512  <methodDataKlass>
>>   10:        141312        3391488
>>  io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
>>   11:        135491        3251784  java.lang.Long
>>   12:         26192        2765496  [C
>>   13:           813        1140560  [Ljava.util.HashMap$Entry;
>>   14:          8997        1061936  java.lang.Class
>>   15:         16022         851384  [[I
>>   16:         16447         789456  java.util.zip.Inflater
>>   17:         13855         723376  [S
>>   18:         17282         691280  java.lang.ref.Finalizer
>>   19:         25725         617400  java.lang.String
>>   20:           320         570368
>>  [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
>>   21:         16066         514112
>>  java.util.concurrent.ConcurrentHashMap$HashEntry
>>   22:         12288         491520
>>  org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment
>>   23:         13343         426976
>>  java.util.concurrent.locks.ReentrantLock$NonfairSync
>>   24:         12288         396416
>>  [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry;
>>   25:         16447         394728  java.util.zip.ZStreamRef
>>   26:           565         370080  [I
>>   27:           508         272288  <objArrayKlassKlass>
>>   28:         16233         259728  java.lang.Object
>>   29:           771         209232
>>  [Ljava.util.concurrent.ConcurrentHashMap$HashEntry;
>>   30:          2524         192312  [Ljava.lang.Object;
>>
>> But as I mentioned above, the heap memory seems OK, the extra memory is
>> consumed by some off-heap data. I can't find a way to figure out what is in
>> there.
>>
>> Besides, I did some extra experiments, i.e. run the same program in
>> difference environments to test whether it has off-heap memory issue:
>>
>> spark1.0 + standalone = no
>> spark1.0 + yarn = no
>> spark1.3 + standalone = no
>> spark1.3 + yarn = yes
>>
>> I'm using CDH5.1, so the spark1.0 is provided by cdh, and
>> spark-1.3.1-bin-hadoop2.3 is downloaded from the official website.
>>
>> I could use spark1.0 + yarn, but I can't find a way to handle the logs,
>> level and rolling, so it'll explode the harddrive.
>>
>> Currently I'll stick to spark1.0 + standalone, until our ops team decides
>> to upgrade cdh.
>>
>>
>>
>> On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das <t...@databricks.com>
>> wrote:
>>
>>> While you are running is it possible for you login into the YARN node
>>> and get histograms of live objects using "jmap -histo:live". That may
>>> reveal something.
>>>
>>>
>>> On Thursday, May 28, 2015, Ji ZHANG <zhangj...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Unfortunately, they're still growing, both driver and executors.
>>>>
>>>> I run the same job with local mode, everything is fine.
>>>>
>>>> On Thu, May 28, 2015 at 5:26 PM, Akhil Das <ak...@sigmoidanalytics.com>
>>>> wrote:
>>>>
>>>>> Can you replace your counting part with this?
>>>>>
>>>>> logs.filter(_.s_id > 0).foreachRDD(rdd => logger.info(rdd.count()))
>>>>>
>>>>>
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>> On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG <zhangj...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I wrote a simple test job, it only does very basic operations. for
>>>>>> example:
>>>>>>
>>>>>>     val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
>>>>>> Map(topic -> 1)).map(_._2)
>>>>>>     val logs = lines.flatMap { line =>
>>>>>>       try {
>>>>>>         Some(parse(line).extract[Impression])
>>>>>>       } catch {
>>>>>>         case _: Exception => None
>>>>>>       }
>>>>>>     }
>>>>>>
>>>>>>     logs.filter(_.s_id > 0).count.foreachRDD { rdd =>
>>>>>>       rdd.foreachPartition { iter =>
>>>>>>         iter.foreach(count => logger.info(count.toString))
>>>>>>       }
>>>>>>     }
>>>>>>
>>>>>> It receives messages from Kafka, parse the json, filter and count the
>>>>>> records, and then print it to logs.
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>>
>>>>>> On Thu, May 28, 2015 at 3:07 PM, Akhil Das <
>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>
>>>>>>> Hi Zhang,
>>>>>>>
>>>>>>> Could you paste your code in a gist? Not sure what you are doing
>>>>>>> inside the code to fill up memory.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Best Regards
>>>>>>>
>>>>>>> On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG <zhangj...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Yes, I'm using createStream, but the storageLevel param is by
>>>>>>>> default MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also
>>>>>>>> growing. I don't think Kafka messages will be cached in driver.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, May 28, 2015 at 12:24 AM, Akhil Das <
>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>
>>>>>>>>> Are you using the createStream or createDirectStream api? If its
>>>>>>>>> the former, you can try setting the StorageLevel to MEMORY_AND_DISK 
>>>>>>>>> (it
>>>>>>>>> might slow things down though). Another way would be to try the later 
>>>>>>>>> one.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG <zhangj...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Akhil,
>>>>>>>>>>
>>>>>>>>>> Thanks for your reply. Accoding to the Streaming tab of Web UI,
>>>>>>>>>> the Processing Time is around 400ms, and there's no Scheduling 
>>>>>>>>>> Delay, so I
>>>>>>>>>> suppose it's not the Kafka messages that eat up the off-heap memory. 
>>>>>>>>>> Or
>>>>>>>>>> maybe it is, but how to tell?
>>>>>>>>>>
>>>>>>>>>> I googled about how to check the off-heap memory usage, there's a
>>>>>>>>>> tool called pmap, but I don't know how to interprete the results.
>>>>>>>>>>
>>>>>>>>>> On Wed, May 27, 2015 at 3:08 PM, Akhil Das <
>>>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> After submitting the job, if you do a ps aux | grep spark-submit
>>>>>>>>>>> then you can see all JVM params. Are you using the highlevel 
>>>>>>>>>>> consumer
>>>>>>>>>>> (receiver based) for receiving data from Kafka? In that case if your
>>>>>>>>>>> throughput is high and the processing delay exceeds batch interval 
>>>>>>>>>>> then you
>>>>>>>>>>> will hit this memory issues as the data will keep on receiving and 
>>>>>>>>>>> is
>>>>>>>>>>> dumped to memory. You can set StorageLevel to MEMORY_AND_DISK (but 
>>>>>>>>>>> it slows
>>>>>>>>>>> things down). Another alternate will be to use the lowlevel
>>>>>>>>>>> kafka consumer
>>>>>>>>>>> <https://github.com/dibbhatt/kafka-spark-consumer> or to use
>>>>>>>>>>> the non-receiver based directStream
>>>>>>>>>>> <https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers>
>>>>>>>>>>> that comes up with spark.
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Best Regards
>>>>>>>>>>>
>>>>>>>>>>> On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG <zhangj...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode.
>>>>>>>>>>>> I find out that YARN is killing the driver and executor process 
>>>>>>>>>>>> because of
>>>>>>>>>>>> excessive use of memory. Here's something I tried:
>>>>>>>>>>>>
>>>>>>>>>>>> 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s),
>>>>>>>>>>>> so the extra memory is not used by heap.
>>>>>>>>>>>> 2. I set the two memoryOverhead params to 1024 (default is
>>>>>>>>>>>> 384), but the memory just keeps growing and then hits the limit.
>>>>>>>>>>>> 3. This problem is not shown in low-throughput jobs, neither in
>>>>>>>>>>>> standalone mode.
>>>>>>>>>>>> 4. The test job just receives messages from Kafka, with batch
>>>>>>>>>>>> interval of 1, do some filtering and aggregation, and then print to
>>>>>>>>>>>> executor logs. So it's not some 3rd party library that causes the 
>>>>>>>>>>>> 'leak'.
>>>>>>>>>>>>
>>>>>>>>>>>> Spark 1.3 is built by myself, with correct hadoop versions.
>>>>>>>>>>>>
>>>>>>>>>>>> Any ideas will be appreciated.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Jerry
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Jerry
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Jerry
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Jerry
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Jerry
>>>>
>>>
>>
>>
>> --
>> Jerry
>>
>
>


-- 
Jerry

Reply via email to