Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-06-18 Thread Ji ZHANG
Hi,

We switched from ParallelGC to CMS, and the symptom is gone.

On Thu, Jun 4, 2015 at 3:37 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 I set spark.shuffle.io.preferDirectBufs to false in SparkConf and this
 setting can be seen in web ui's environment tab. But, it still eats memory,
 i.e. -Xmx set to 512M but RES grows to 1.5G in half a day.


 On Wed, Jun 3, 2015 at 12:02 PM, Shixiong Zhu zsxw...@gmail.com wrote:

 Could you set spark.shuffle.io.preferDirectBufs to false to turn off
 the off-heap allocation of netty?

 Best Regards,
 Shixiong Zhu

 2015-06-03 11:58 GMT+08:00 Ji ZHANG zhangj...@gmail.com:

 Hi,

 Thanks for you information. I'll give spark1.4 a try when it's released.

 On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das t...@databricks.com
 wrote:

 Could you try it out with Spark 1.4 RC3?

 Also pinging, Cloudera folks, they may be aware of something.

 BTW, the way I have debugged memory leaks in the past is as follows.

 Run with a small driver memory, say 1 GB. Periodically (maybe a
 script), take snapshots of histogram and also do memory dumps. Say every
 hour. And then compare the difference between two histo/dumps that are few
 hours separated (more the better). Diffing histo is easy. Diff two dumps
 can be done in JVisualVM, it will show the diff in the objects that got
 added in the later dump. That makes it easy to debug what is not getting
 cleaned.

 TD


 On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Thanks for you reply. Here's the top 30 entries of jmap -histo:live
 result:

  num #instances #bytes  class name
 --
1: 40802  145083848  [B
2: 99264   12716112  methodKlass
3: 99264   12291480  constMethodKlass
4:  84729144816  constantPoolKlass
5:  84727625192  instanceKlassKlass
6:   1866097824
  [Lscala.concurrent.forkjoin.ForkJoinTask;
7:  70454804832  constantPoolCacheKlass
8:1391684453376  java.util.HashMap$Entry
9:  94273542512  methodDataKlass
   10:1413123391488
  io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
   11:1354913251784  java.lang.Long
   12: 261922765496  [C
   13:   8131140560  [Ljava.util.HashMap$Entry;
   14:  89971061936  java.lang.Class
   15: 16022 851384  [[I
   16: 16447 789456  java.util.zip.Inflater
   17: 13855 723376  [S
   18: 17282 691280  java.lang.ref.Finalizer
   19: 25725 617400  java.lang.String
   20:   320 570368
  [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
   21: 16066 514112
  java.util.concurrent.ConcurrentHashMap$HashEntry
   22: 12288 491520
  org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment
   23: 13343 426976
  java.util.concurrent.locks.ReentrantLock$NonfairSync
   24: 12288 396416
  [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry;
   25: 16447 394728  java.util.zip.ZStreamRef
   26:   565 370080  [I
   27:   508 272288  objArrayKlassKlass
   28: 16233 259728  java.lang.Object
   29:   771 209232
  [Ljava.util.concurrent.ConcurrentHashMap$HashEntry;
   30:  2524 192312  [Ljava.lang.Object;

 But as I mentioned above, the heap memory seems OK, the extra memory
 is consumed by some off-heap data. I can't find a way to figure out what 
 is
 in there.

 Besides, I did some extra experiments, i.e. run the same program in
 difference environments to test whether it has off-heap memory issue:

 spark1.0 + standalone = no
 spark1.0 + yarn = no
 spark1.3 + standalone = no
 spark1.3 + yarn = yes

 I'm using CDH5.1, so the spark1.0 is provided by cdh, and
 spark-1.3.1-bin-hadoop2.3 is downloaded from the official website.

 I could use spark1.0 + yarn, but I can't find a way to handle the
 logs, level and rolling, so it'll explode the harddrive.

 Currently I'll stick to spark1.0 + standalone, until our ops team
 decides to upgrade cdh.



 On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com
 wrote:

 While you are running is it possible for you login into the YARN node
 and get histograms of live objects using jmap -histo:live. That may
 reveal something.


 On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Unfortunately, they're still growing, both driver and executors.

 I run the same job with local mode, everything is fine.

 On Thu, May 28, 2015 at 5:26 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 Can you replace your counting part with this?

 logs.filter(_.s_id  0).foreachRDD(rdd = logger.info(rdd.count()))



 Thanks
 Best Regards

 On Thu, May 28, 

Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-06-18 Thread Tathagata Das
Glad to hear that. :)

On Thu, Jun 18, 2015 at 6:25 AM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 We switched from ParallelGC to CMS, and the symptom is gone.

 On Thu, Jun 4, 2015 at 3:37 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 I set spark.shuffle.io.preferDirectBufs to false in SparkConf and this
 setting can be seen in web ui's environment tab. But, it still eats memory,
 i.e. -Xmx set to 512M but RES grows to 1.5G in half a day.


 On Wed, Jun 3, 2015 at 12:02 PM, Shixiong Zhu zsxw...@gmail.com wrote:

 Could you set spark.shuffle.io.preferDirectBufs to false to turn off
 the off-heap allocation of netty?

 Best Regards,
 Shixiong Zhu

 2015-06-03 11:58 GMT+08:00 Ji ZHANG zhangj...@gmail.com:

 Hi,

 Thanks for you information. I'll give spark1.4 a try when it's released.

 On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das t...@databricks.com
 wrote:

 Could you try it out with Spark 1.4 RC3?

 Also pinging, Cloudera folks, they may be aware of something.

 BTW, the way I have debugged memory leaks in the past is as follows.

 Run with a small driver memory, say 1 GB. Periodically (maybe a
 script), take snapshots of histogram and also do memory dumps. Say every
 hour. And then compare the difference between two histo/dumps that are few
 hours separated (more the better). Diffing histo is easy. Diff two dumps
 can be done in JVisualVM, it will show the diff in the objects that got
 added in the later dump. That makes it easy to debug what is not getting
 cleaned.

 TD


 On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Thanks for you reply. Here's the top 30 entries of jmap -histo:live
 result:

  num #instances #bytes  class name
 --
1: 40802  145083848  [B
2: 99264   12716112  methodKlass
3: 99264   12291480  constMethodKlass
4:  84729144816  constantPoolKlass
5:  84727625192  instanceKlassKlass
6:   1866097824
  [Lscala.concurrent.forkjoin.ForkJoinTask;
7:  70454804832  constantPoolCacheKlass
8:1391684453376  java.util.HashMap$Entry
9:  94273542512  methodDataKlass
   10:1413123391488
  io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
   11:1354913251784  java.lang.Long
   12: 261922765496  [C
   13:   8131140560  [Ljava.util.HashMap$Entry;
   14:  89971061936  java.lang.Class
   15: 16022 851384  [[I
   16: 16447 789456  java.util.zip.Inflater
   17: 13855 723376  [S
   18: 17282 691280  java.lang.ref.Finalizer
   19: 25725 617400  java.lang.String
   20:   320 570368
  [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
   21: 16066 514112
  java.util.concurrent.ConcurrentHashMap$HashEntry
   22: 12288 491520
  org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment
   23: 13343 426976
  java.util.concurrent.locks.ReentrantLock$NonfairSync
   24: 12288 396416
  [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry;
   25: 16447 394728  java.util.zip.ZStreamRef
   26:   565 370080  [I
   27:   508 272288  objArrayKlassKlass
   28: 16233 259728  java.lang.Object
   29:   771 209232
  [Ljava.util.concurrent.ConcurrentHashMap$HashEntry;
   30:  2524 192312  [Ljava.lang.Object;

 But as I mentioned above, the heap memory seems OK, the extra memory
 is consumed by some off-heap data. I can't find a way to figure out what 
 is
 in there.

 Besides, I did some extra experiments, i.e. run the same program in
 difference environments to test whether it has off-heap memory issue:

 spark1.0 + standalone = no
 spark1.0 + yarn = no
 spark1.3 + standalone = no
 spark1.3 + yarn = yes

 I'm using CDH5.1, so the spark1.0 is provided by cdh, and
 spark-1.3.1-bin-hadoop2.3 is downloaded from the official website.

 I could use spark1.0 + yarn, but I can't find a way to handle the
 logs, level and rolling, so it'll explode the harddrive.

 Currently I'll stick to spark1.0 + standalone, until our ops team
 decides to upgrade cdh.



 On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com
 wrote:

 While you are running is it possible for you login into the YARN
 node and get histograms of live objects using jmap -histo:live. That 
 may
 reveal something.


 On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Unfortunately, they're still growing, both driver and executors.

 I run the same job with local mode, everything is fine.

 On Thu, May 28, 2015 at 5:26 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 Can you replace your counting part with this?

 

Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-06-04 Thread Ji ZHANG
Hi,

I set spark.shuffle.io.preferDirectBufs to false in SparkConf and this
setting can be seen in web ui's environment tab. But, it still eats memory,
i.e. -Xmx set to 512M but RES grows to 1.5G in half a day.


On Wed, Jun 3, 2015 at 12:02 PM, Shixiong Zhu zsxw...@gmail.com wrote:

 Could you set spark.shuffle.io.preferDirectBufs to false to turn off the
 off-heap allocation of netty?

 Best Regards,
 Shixiong Zhu

 2015-06-03 11:58 GMT+08:00 Ji ZHANG zhangj...@gmail.com:

 Hi,

 Thanks for you information. I'll give spark1.4 a try when it's released.

 On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das t...@databricks.com
 wrote:

 Could you try it out with Spark 1.4 RC3?

 Also pinging, Cloudera folks, they may be aware of something.

 BTW, the way I have debugged memory leaks in the past is as follows.

 Run with a small driver memory, say 1 GB. Periodically (maybe a script),
 take snapshots of histogram and also do memory dumps. Say every hour. And
 then compare the difference between two histo/dumps that are few hours
 separated (more the better). Diffing histo is easy. Diff two dumps can be
 done in JVisualVM, it will show the diff in the objects that got added in
 the later dump. That makes it easy to debug what is not getting cleaned.

 TD


 On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Thanks for you reply. Here's the top 30 entries of jmap -histo:live
 result:

  num #instances #bytes  class name
 --
1: 40802  145083848  [B
2: 99264   12716112  methodKlass
3: 99264   12291480  constMethodKlass
4:  84729144816  constantPoolKlass
5:  84727625192  instanceKlassKlass
6:   1866097824
  [Lscala.concurrent.forkjoin.ForkJoinTask;
7:  70454804832  constantPoolCacheKlass
8:1391684453376  java.util.HashMap$Entry
9:  94273542512  methodDataKlass
   10:1413123391488
  io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
   11:1354913251784  java.lang.Long
   12: 261922765496  [C
   13:   8131140560  [Ljava.util.HashMap$Entry;
   14:  89971061936  java.lang.Class
   15: 16022 851384  [[I
   16: 16447 789456  java.util.zip.Inflater
   17: 13855 723376  [S
   18: 17282 691280  java.lang.ref.Finalizer
   19: 25725 617400  java.lang.String
   20:   320 570368
  [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
   21: 16066 514112
  java.util.concurrent.ConcurrentHashMap$HashEntry
   22: 12288 491520
  org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment
   23: 13343 426976
  java.util.concurrent.locks.ReentrantLock$NonfairSync
   24: 12288 396416
  [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry;
   25: 16447 394728  java.util.zip.ZStreamRef
   26:   565 370080  [I
   27:   508 272288  objArrayKlassKlass
   28: 16233 259728  java.lang.Object
   29:   771 209232
  [Ljava.util.concurrent.ConcurrentHashMap$HashEntry;
   30:  2524 192312  [Ljava.lang.Object;

 But as I mentioned above, the heap memory seems OK, the extra memory is
 consumed by some off-heap data. I can't find a way to figure out what is in
 there.

 Besides, I did some extra experiments, i.e. run the same program in
 difference environments to test whether it has off-heap memory issue:

 spark1.0 + standalone = no
 spark1.0 + yarn = no
 spark1.3 + standalone = no
 spark1.3 + yarn = yes

 I'm using CDH5.1, so the spark1.0 is provided by cdh, and
 spark-1.3.1-bin-hadoop2.3 is downloaded from the official website.

 I could use spark1.0 + yarn, but I can't find a way to handle the logs,
 level and rolling, so it'll explode the harddrive.

 Currently I'll stick to spark1.0 + standalone, until our ops team
 decides to upgrade cdh.



 On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com
 wrote:

 While you are running is it possible for you login into the YARN node
 and get histograms of live objects using jmap -histo:live. That may
 reveal something.


 On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Unfortunately, they're still growing, both driver and executors.

 I run the same job with local mode, everything is fine.

 On Thu, May 28, 2015 at 5:26 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 Can you replace your counting part with this?

 logs.filter(_.s_id  0).foreachRDD(rdd = logger.info(rdd.count()))



 Thanks
 Best Regards

 On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com
 wrote:

 Hi,

 I wrote a simple test job, it only does very basic operations. for
 example:

 

Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-06-02 Thread Ji ZHANG
Hi,

Thanks for you information. I'll give spark1.4 a try when it's released.

On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das t...@databricks.com wrote:

 Could you try it out with Spark 1.4 RC3?

 Also pinging, Cloudera folks, they may be aware of something.

 BTW, the way I have debugged memory leaks in the past is as follows.

 Run with a small driver memory, say 1 GB. Periodically (maybe a script),
 take snapshots of histogram and also do memory dumps. Say every hour. And
 then compare the difference between two histo/dumps that are few hours
 separated (more the better). Diffing histo is easy. Diff two dumps can be
 done in JVisualVM, it will show the diff in the objects that got added in
 the later dump. That makes it easy to debug what is not getting cleaned.

 TD


 On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Thanks for you reply. Here's the top 30 entries of jmap -histo:live
 result:

  num #instances #bytes  class name
 --
1: 40802  145083848  [B
2: 99264   12716112  methodKlass
3: 99264   12291480  constMethodKlass
4:  84729144816  constantPoolKlass
5:  84727625192  instanceKlassKlass
6:   1866097824
  [Lscala.concurrent.forkjoin.ForkJoinTask;
7:  70454804832  constantPoolCacheKlass
8:1391684453376  java.util.HashMap$Entry
9:  94273542512  methodDataKlass
   10:1413123391488
  io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
   11:1354913251784  java.lang.Long
   12: 261922765496  [C
   13:   8131140560  [Ljava.util.HashMap$Entry;
   14:  89971061936  java.lang.Class
   15: 16022 851384  [[I
   16: 16447 789456  java.util.zip.Inflater
   17: 13855 723376  [S
   18: 17282 691280  java.lang.ref.Finalizer
   19: 25725 617400  java.lang.String
   20:   320 570368
  [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
   21: 16066 514112
  java.util.concurrent.ConcurrentHashMap$HashEntry
   22: 12288 491520
  org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment
   23: 13343 426976
  java.util.concurrent.locks.ReentrantLock$NonfairSync
   24: 12288 396416
  [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry;
   25: 16447 394728  java.util.zip.ZStreamRef
   26:   565 370080  [I
   27:   508 272288  objArrayKlassKlass
   28: 16233 259728  java.lang.Object
   29:   771 209232
  [Ljava.util.concurrent.ConcurrentHashMap$HashEntry;
   30:  2524 192312  [Ljava.lang.Object;

 But as I mentioned above, the heap memory seems OK, the extra memory is
 consumed by some off-heap data. I can't find a way to figure out what is in
 there.

 Besides, I did some extra experiments, i.e. run the same program in
 difference environments to test whether it has off-heap memory issue:

 spark1.0 + standalone = no
 spark1.0 + yarn = no
 spark1.3 + standalone = no
 spark1.3 + yarn = yes

 I'm using CDH5.1, so the spark1.0 is provided by cdh, and
 spark-1.3.1-bin-hadoop2.3 is downloaded from the official website.

 I could use spark1.0 + yarn, but I can't find a way to handle the logs,
 level and rolling, so it'll explode the harddrive.

 Currently I'll stick to spark1.0 + standalone, until our ops team decides
 to upgrade cdh.



 On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com
 wrote:

 While you are running is it possible for you login into the YARN node
 and get histograms of live objects using jmap -histo:live. That may
 reveal something.


 On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Unfortunately, they're still growing, both driver and executors.

 I run the same job with local mode, everything is fine.

 On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Can you replace your counting part with this?

 logs.filter(_.s_id  0).foreachRDD(rdd = logger.info(rdd.count()))



 Thanks
 Best Regards

 On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 I wrote a simple test job, it only does very basic operations. for
 example:

 val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
 Map(topic - 1)).map(_._2)
 val logs = lines.flatMap { line =
   try {
 Some(parse(line).extract[Impression])
   } catch {
 case _: Exception = None
   }
 }

 logs.filter(_.s_id  0).count.foreachRDD { rdd =
   rdd.foreachPartition { iter =
 iter.foreach(count = logger.info(count.toString))
   }
 }

 It receives messages from Kafka, parse the json, filter and count 

Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-06-02 Thread Tathagata Das
Could you try it out with Spark 1.4 RC3?

Also pinging, Cloudera folks, they may be aware of something.

BTW, the way I have debugged memory leaks in the past is as follows.

Run with a small driver memory, say 1 GB. Periodically (maybe a script),
take snapshots of histogram and also do memory dumps. Say every hour. And
then compare the difference between two histo/dumps that are few hours
separated (more the better). Diffing histo is easy. Diff two dumps can be
done in JVisualVM, it will show the diff in the objects that got added in
the later dump. That makes it easy to debug what is not getting cleaned.

TD


On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Thanks for you reply. Here's the top 30 entries of jmap -histo:live result:

  num #instances #bytes  class name
 --
1: 40802  145083848  [B
2: 99264   12716112  methodKlass
3: 99264   12291480  constMethodKlass
4:  84729144816  constantPoolKlass
5:  84727625192  instanceKlassKlass
6:   1866097824
  [Lscala.concurrent.forkjoin.ForkJoinTask;
7:  70454804832  constantPoolCacheKlass
8:1391684453376  java.util.HashMap$Entry
9:  94273542512  methodDataKlass
   10:1413123391488
  io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
   11:1354913251784  java.lang.Long
   12: 261922765496  [C
   13:   8131140560  [Ljava.util.HashMap$Entry;
   14:  89971061936  java.lang.Class
   15: 16022 851384  [[I
   16: 16447 789456  java.util.zip.Inflater
   17: 13855 723376  [S
   18: 17282 691280  java.lang.ref.Finalizer
   19: 25725 617400  java.lang.String
   20:   320 570368
  [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
   21: 16066 514112
  java.util.concurrent.ConcurrentHashMap$HashEntry
   22: 12288 491520
  org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment
   23: 13343 426976
  java.util.concurrent.locks.ReentrantLock$NonfairSync
   24: 12288 396416
  [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry;
   25: 16447 394728  java.util.zip.ZStreamRef
   26:   565 370080  [I
   27:   508 272288  objArrayKlassKlass
   28: 16233 259728  java.lang.Object
   29:   771 209232
  [Ljava.util.concurrent.ConcurrentHashMap$HashEntry;
   30:  2524 192312  [Ljava.lang.Object;

 But as I mentioned above, the heap memory seems OK, the extra memory is
 consumed by some off-heap data. I can't find a way to figure out what is in
 there.

 Besides, I did some extra experiments, i.e. run the same program in
 difference environments to test whether it has off-heap memory issue:

 spark1.0 + standalone = no
 spark1.0 + yarn = no
 spark1.3 + standalone = no
 spark1.3 + yarn = yes

 I'm using CDH5.1, so the spark1.0 is provided by cdh, and
 spark-1.3.1-bin-hadoop2.3 is downloaded from the official website.

 I could use spark1.0 + yarn, but I can't find a way to handle the logs,
 level and rolling, so it'll explode the harddrive.

 Currently I'll stick to spark1.0 + standalone, until our ops team decides
 to upgrade cdh.



 On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com wrote:

 While you are running is it possible for you login into the YARN node and
 get histograms of live objects using jmap -histo:live. That may reveal
 something.


 On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Unfortunately, they're still growing, both driver and executors.

 I run the same job with local mode, everything is fine.

 On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Can you replace your counting part with this?

 logs.filter(_.s_id  0).foreachRDD(rdd = logger.info(rdd.count()))



 Thanks
 Best Regards

 On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 I wrote a simple test job, it only does very basic operations. for
 example:

 val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
 Map(topic - 1)).map(_._2)
 val logs = lines.flatMap { line =
   try {
 Some(parse(line).extract[Impression])
   } catch {
 case _: Exception = None
   }
 }

 logs.filter(_.s_id  0).count.foreachRDD { rdd =
   rdd.foreachPartition { iter =
 iter.foreach(count = logger.info(count.toString))
   }
 }

 It receives messages from Kafka, parse the json, filter and count the
 records, and then print it to logs.

 Thanks.


 On Thu, May 28, 2015 at 3:07 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 Hi Zhang,

 Could you paste your 

Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-06-02 Thread Ji ZHANG
Hi,

Thanks for you reply. Here's the top 30 entries of jmap -histo:live result:

 num #instances #bytes  class name
--
   1: 40802  145083848  [B
   2: 99264   12716112  methodKlass
   3: 99264   12291480  constMethodKlass
   4:  84729144816  constantPoolKlass
   5:  84727625192  instanceKlassKlass
   6:   1866097824
 [Lscala.concurrent.forkjoin.ForkJoinTask;
   7:  70454804832  constantPoolCacheKlass
   8:1391684453376  java.util.HashMap$Entry
   9:  94273542512  methodDataKlass
  10:1413123391488
 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
  11:1354913251784  java.lang.Long
  12: 261922765496  [C
  13:   8131140560  [Ljava.util.HashMap$Entry;
  14:  89971061936  java.lang.Class
  15: 16022 851384  [[I
  16: 16447 789456  java.util.zip.Inflater
  17: 13855 723376  [S
  18: 17282 691280  java.lang.ref.Finalizer
  19: 25725 617400  java.lang.String
  20:   320 570368
 [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry;
  21: 16066 514112
 java.util.concurrent.ConcurrentHashMap$HashEntry
  22: 12288 491520
 org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment
  23: 13343 426976
 java.util.concurrent.locks.ReentrantLock$NonfairSync
  24: 12288 396416
 [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry;
  25: 16447 394728  java.util.zip.ZStreamRef
  26:   565 370080  [I
  27:   508 272288  objArrayKlassKlass
  28: 16233 259728  java.lang.Object
  29:   771 209232
 [Ljava.util.concurrent.ConcurrentHashMap$HashEntry;
  30:  2524 192312  [Ljava.lang.Object;

But as I mentioned above, the heap memory seems OK, the extra memory is
consumed by some off-heap data. I can't find a way to figure out what is in
there.

Besides, I did some extra experiments, i.e. run the same program in
difference environments to test whether it has off-heap memory issue:

spark1.0 + standalone = no
spark1.0 + yarn = no
spark1.3 + standalone = no
spark1.3 + yarn = yes

I'm using CDH5.1, so the spark1.0 is provided by cdh, and
spark-1.3.1-bin-hadoop2.3 is downloaded from the official website.

I could use spark1.0 + yarn, but I can't find a way to handle the logs,
level and rolling, so it'll explode the harddrive.

Currently I'll stick to spark1.0 + standalone, until our ops team decides
to upgrade cdh.



On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com wrote:

 While you are running is it possible for you login into the YARN node and
 get histograms of live objects using jmap -histo:live. That may reveal
 something.


 On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Unfortunately, they're still growing, both driver and executors.

 I run the same job with local mode, everything is fine.

 On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Can you replace your counting part with this?

 logs.filter(_.s_id  0).foreachRDD(rdd = logger.info(rdd.count()))



 Thanks
 Best Regards

 On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 I wrote a simple test job, it only does very basic operations. for
 example:

 val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic
 - 1)).map(_._2)
 val logs = lines.flatMap { line =
   try {
 Some(parse(line).extract[Impression])
   } catch {
 case _: Exception = None
   }
 }

 logs.filter(_.s_id  0).count.foreachRDD { rdd =
   rdd.foreachPartition { iter =
 iter.foreach(count = logger.info(count.toString))
   }
 }

 It receives messages from Kafka, parse the json, filter and count the
 records, and then print it to logs.

 Thanks.


 On Thu, May 28, 2015 at 3:07 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Zhang,

 Could you paste your code in a gist? Not sure what you are doing
 inside the code to fill up memory.

 Thanks
 Best Regards

 On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG zhangj...@gmail.com
 wrote:

 Hi,

 Yes, I'm using createStream, but the storageLevel param is by default
 MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I
 don't think Kafka messages will be cached in driver.


 On Thu, May 28, 2015 at 12:24 AM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 Are you using the createStream or createDirectStream api? If its the
 former, you can try setting the StorageLevel to MEMORY_AND_DISK (it 
 might
 slow things down though). Another way would be to try the later one.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG 

Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-05-28 Thread Akhil Das
Hi Zhang,

Could you paste your code in a gist? Not sure what you are doing inside the
code to fill up memory.

Thanks
Best Regards

On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Yes, I'm using createStream, but the storageLevel param is by default
 MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I
 don't think Kafka messages will be cached in driver.


 On Thu, May 28, 2015 at 12:24 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Are you using the createStream or createDirectStream api? If its the
 former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might
 slow things down though). Another way would be to try the later one.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi Akhil,

 Thanks for your reply. Accoding to the Streaming tab of Web UI, the
 Processing Time is around 400ms, and there's no Scheduling Delay, so I
 suppose it's not the Kafka messages that eat up the off-heap memory. Or
 maybe it is, but how to tell?

 I googled about how to check the off-heap memory usage, there's a tool
 called pmap, but I don't know how to interprete the results.

 On Wed, May 27, 2015 at 3:08 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 After submitting the job, if you do a ps aux | grep spark-submit then
 you can see all JVM params. Are you using the highlevel consumer (receiver
 based) for receiving data from Kafka? In that case if your throughput is
 high and the processing delay exceeds batch interval then you will hit this
 memory issues as the data will keep on receiving and is dumped to memory.
 You can set StorageLevel to MEMORY_AND_DISK (but it slows things down).
 Another alternate will be to use the lowlevel kafka consumer
 https://github.com/dibbhatt/kafka-spark-consumer or to use the
 non-receiver based directStream
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
 that comes up with spark.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I find
 out that YARN is killing the driver and executor process because of
 excessive use of memory. Here's something I tried:

 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the
 extra memory is not used by heap.
 2. I set the two memoryOverhead params to 1024 (default is 384), but
 the memory just keeps growing and then hits the limit.
 3. This problem is not shown in low-throughput jobs, neither in
 standalone mode.
 4. The test job just receives messages from Kafka, with batch interval
 of 1, do some filtering and aggregation, and then print to executor logs.
 So it's not some 3rd party library that causes the 'leak'.

 Spark 1.3 is built by myself, with correct hadoop versions.

 Any ideas will be appreciated.

 Thanks.

 --
 Jerry





 --
 Jerry





 --
 Jerry



Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-05-28 Thread Ji ZHANG
Hi,

I wrote a simple test job, it only does very basic operations. for example:

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic -
1)).map(_._2)
val logs = lines.flatMap { line =
  try {
Some(parse(line).extract[Impression])
  } catch {
case _: Exception = None
  }
}

logs.filter(_.s_id  0).count.foreachRDD { rdd =
  rdd.foreachPartition { iter =
iter.foreach(count = logger.info(count.toString))
  }
}

It receives messages from Kafka, parse the json, filter and count the
records, and then print it to logs.

Thanks.


On Thu, May 28, 2015 at 3:07 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Hi Zhang,

 Could you paste your code in a gist? Not sure what you are doing inside
 the code to fill up memory.

 Thanks
 Best Regards

 On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Yes, I'm using createStream, but the storageLevel param is by default
 MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I
 don't think Kafka messages will be cached in driver.


 On Thu, May 28, 2015 at 12:24 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Are you using the createStream or createDirectStream api? If its the
 former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might
 slow things down though). Another way would be to try the later one.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi Akhil,

 Thanks for your reply. Accoding to the Streaming tab of Web UI, the
 Processing Time is around 400ms, and there's no Scheduling Delay, so I
 suppose it's not the Kafka messages that eat up the off-heap memory. Or
 maybe it is, but how to tell?

 I googled about how to check the off-heap memory usage, there's a tool
 called pmap, but I don't know how to interprete the results.

 On Wed, May 27, 2015 at 3:08 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 After submitting the job, if you do a ps aux | grep spark-submit then
 you can see all JVM params. Are you using the highlevel consumer (receiver
 based) for receiving data from Kafka? In that case if your throughput is
 high and the processing delay exceeds batch interval then you will hit 
 this
 memory issues as the data will keep on receiving and is dumped to memory.
 You can set StorageLevel to MEMORY_AND_DISK (but it slows things down).
 Another alternate will be to use the lowlevel kafka consumer
 https://github.com/dibbhatt/kafka-spark-consumer or to use the
 non-receiver based directStream
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
 that comes up with spark.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG zhangj...@gmail.com
 wrote:

 Hi,

 I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I
 find out that YARN is killing the driver and executor process because of
 excessive use of memory. Here's something I tried:

 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the
 extra memory is not used by heap.
 2. I set the two memoryOverhead params to 1024 (default is 384), but
 the memory just keeps growing and then hits the limit.
 3. This problem is not shown in low-throughput jobs, neither in
 standalone mode.
 4. The test job just receives messages from Kafka, with batch
 interval of 1, do some filtering and aggregation, and then print to
 executor logs. So it's not some 3rd party library that causes the 'leak'.

 Spark 1.3 is built by myself, with correct hadoop versions.

 Any ideas will be appreciated.

 Thanks.

 --
 Jerry





 --
 Jerry





 --
 Jerry





-- 
Jerry


Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-05-28 Thread Akhil Das
Can you replace your counting part with this?

logs.filter(_.s_id  0).foreachRDD(rdd = logger.info(rdd.count()))



Thanks
Best Regards

On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 I wrote a simple test job, it only does very basic operations. for example:

 val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic -
 1)).map(_._2)
 val logs = lines.flatMap { line =
   try {
 Some(parse(line).extract[Impression])
   } catch {
 case _: Exception = None
   }
 }

 logs.filter(_.s_id  0).count.foreachRDD { rdd =
   rdd.foreachPartition { iter =
 iter.foreach(count = logger.info(count.toString))
   }
 }

 It receives messages from Kafka, parse the json, filter and count the
 records, and then print it to logs.

 Thanks.


 On Thu, May 28, 2015 at 3:07 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Zhang,

 Could you paste your code in a gist? Not sure what you are doing inside
 the code to fill up memory.

 Thanks
 Best Regards

 On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Yes, I'm using createStream, but the storageLevel param is by default
 MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I
 don't think Kafka messages will be cached in driver.


 On Thu, May 28, 2015 at 12:24 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Are you using the createStream or createDirectStream api? If its the
 former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might
 slow things down though). Another way would be to try the later one.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi Akhil,

 Thanks for your reply. Accoding to the Streaming tab of Web UI, the
 Processing Time is around 400ms, and there's no Scheduling Delay, so I
 suppose it's not the Kafka messages that eat up the off-heap memory. Or
 maybe it is, but how to tell?

 I googled about how to check the off-heap memory usage, there's a tool
 called pmap, but I don't know how to interprete the results.

 On Wed, May 27, 2015 at 3:08 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 After submitting the job, if you do a ps aux | grep spark-submit then
 you can see all JVM params. Are you using the highlevel consumer 
 (receiver
 based) for receiving data from Kafka? In that case if your throughput is
 high and the processing delay exceeds batch interval then you will hit 
 this
 memory issues as the data will keep on receiving and is dumped to memory.
 You can set StorageLevel to MEMORY_AND_DISK (but it slows things down).
 Another alternate will be to use the lowlevel kafka consumer
 https://github.com/dibbhatt/kafka-spark-consumer or to use the
 non-receiver based directStream
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
 that comes up with spark.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG zhangj...@gmail.com
 wrote:

 Hi,

 I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I
 find out that YARN is killing the driver and executor process because of
 excessive use of memory. Here's something I tried:

 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so
 the extra memory is not used by heap.
 2. I set the two memoryOverhead params to 1024 (default is 384), but
 the memory just keeps growing and then hits the limit.
 3. This problem is not shown in low-throughput jobs, neither in
 standalone mode.
 4. The test job just receives messages from Kafka, with batch
 interval of 1, do some filtering and aggregation, and then print to
 executor logs. So it's not some 3rd party library that causes the 
 'leak'.

 Spark 1.3 is built by myself, with correct hadoop versions.

 Any ideas will be appreciated.

 Thanks.

 --
 Jerry





 --
 Jerry





 --
 Jerry





 --
 Jerry



Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-05-28 Thread Ji ZHANG
Hi,

Unfortunately, they're still growing, both driver and executors.

I run the same job with local mode, everything is fine.

On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Can you replace your counting part with this?

 logs.filter(_.s_id  0).foreachRDD(rdd = logger.info(rdd.count()))



 Thanks
 Best Regards

 On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 I wrote a simple test job, it only does very basic operations. for
 example:

 val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic
 - 1)).map(_._2)
 val logs = lines.flatMap { line =
   try {
 Some(parse(line).extract[Impression])
   } catch {
 case _: Exception = None
   }
 }

 logs.filter(_.s_id  0).count.foreachRDD { rdd =
   rdd.foreachPartition { iter =
 iter.foreach(count = logger.info(count.toString))
   }
 }

 It receives messages from Kafka, parse the json, filter and count the
 records, and then print it to logs.

 Thanks.


 On Thu, May 28, 2015 at 3:07 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Zhang,

 Could you paste your code in a gist? Not sure what you are doing inside
 the code to fill up memory.

 Thanks
 Best Regards

 On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 Yes, I'm using createStream, but the storageLevel param is by default
 MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I
 don't think Kafka messages will be cached in driver.


 On Thu, May 28, 2015 at 12:24 AM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 Are you using the createStream or createDirectStream api? If its the
 former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might
 slow things down though). Another way would be to try the later one.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi Akhil,

 Thanks for your reply. Accoding to the Streaming tab of Web UI, the
 Processing Time is around 400ms, and there's no Scheduling Delay, so I
 suppose it's not the Kafka messages that eat up the off-heap memory. Or
 maybe it is, but how to tell?

 I googled about how to check the off-heap memory usage, there's a
 tool called pmap, but I don't know how to interprete the results.

 On Wed, May 27, 2015 at 3:08 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 After submitting the job, if you do a ps aux | grep spark-submit
 then you can see all JVM params. Are you using the highlevel consumer
 (receiver based) for receiving data from Kafka? In that case if your
 throughput is high and the processing delay exceeds batch interval then 
 you
 will hit this memory issues as the data will keep on receiving and is
 dumped to memory. You can set StorageLevel to MEMORY_AND_DISK (but it 
 slows
 things down). Another alternate will be to use the lowlevel kafka
 consumer https://github.com/dibbhatt/kafka-spark-consumer or to
 use the non-receiver based directStream
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
 that comes up with spark.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG zhangj...@gmail.com
 wrote:

 Hi,

 I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I
 find out that YARN is killing the driver and executor process because 
 of
 excessive use of memory. Here's something I tried:

 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so
 the extra memory is not used by heap.
 2. I set the two memoryOverhead params to 1024 (default is 384),
 but the memory just keeps growing and then hits the limit.
 3. This problem is not shown in low-throughput jobs, neither in
 standalone mode.
 4. The test job just receives messages from Kafka, with batch
 interval of 1, do some filtering and aggregation, and then print to
 executor logs. So it's not some 3rd party library that causes the 
 'leak'.

 Spark 1.3 is built by myself, with correct hadoop versions.

 Any ideas will be appreciated.

 Thanks.

 --
 Jerry





 --
 Jerry





 --
 Jerry





 --
 Jerry





-- 
Jerry


Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-05-27 Thread Akhil Das
After submitting the job, if you do a ps aux | grep spark-submit then you
can see all JVM params. Are you using the highlevel consumer (receiver
based) for receiving data from Kafka? In that case if your throughput is
high and the processing delay exceeds batch interval then you will hit this
memory issues as the data will keep on receiving and is dumped to memory.
You can set StorageLevel to MEMORY_AND_DISK (but it slows things down).
Another alternate will be to use the lowlevel kafka consumer
https://github.com/dibbhatt/kafka-spark-consumer or to use the
non-receiver based directStream
https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
that comes up with spark.

Thanks
Best Regards

On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I find out
 that YARN is killing the driver and executor process because of excessive
 use of memory. Here's something I tried:

 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the
 extra memory is not used by heap.
 2. I set the two memoryOverhead params to 1024 (default is 384), but the
 memory just keeps growing and then hits the limit.
 3. This problem is not shown in low-throughput jobs, neither in standalone
 mode.
 4. The test job just receives messages from Kafka, with batch interval of
 1, do some filtering and aggregation, and then print to executor logs. So
 it's not some 3rd party library that causes the 'leak'.

 Spark 1.3 is built by myself, with correct hadoop versions.

 Any ideas will be appreciated.

 Thanks.

 --
 Jerry



Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-05-27 Thread Ji ZHANG
Hi,

Yes, I'm using createStream, but the storageLevel param is by default
MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I
don't think Kafka messages will be cached in driver.


On Thu, May 28, 2015 at 12:24 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Are you using the createStream or createDirectStream api? If its the
 former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might
 slow things down though). Another way would be to try the later one.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi Akhil,

 Thanks for your reply. Accoding to the Streaming tab of Web UI, the
 Processing Time is around 400ms, and there's no Scheduling Delay, so I
 suppose it's not the Kafka messages that eat up the off-heap memory. Or
 maybe it is, but how to tell?

 I googled about how to check the off-heap memory usage, there's a tool
 called pmap, but I don't know how to interprete the results.

 On Wed, May 27, 2015 at 3:08 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 After submitting the job, if you do a ps aux | grep spark-submit then
 you can see all JVM params. Are you using the highlevel consumer (receiver
 based) for receiving data from Kafka? In that case if your throughput is
 high and the processing delay exceeds batch interval then you will hit this
 memory issues as the data will keep on receiving and is dumped to memory.
 You can set StorageLevel to MEMORY_AND_DISK (but it slows things down).
 Another alternate will be to use the lowlevel kafka consumer
 https://github.com/dibbhatt/kafka-spark-consumer or to use the
 non-receiver based directStream
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
 that comes up with spark.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I find
 out that YARN is killing the driver and executor process because of
 excessive use of memory. Here's something I tried:

 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the
 extra memory is not used by heap.
 2. I set the two memoryOverhead params to 1024 (default is 384), but
 the memory just keeps growing and then hits the limit.
 3. This problem is not shown in low-throughput jobs, neither in
 standalone mode.
 4. The test job just receives messages from Kafka, with batch interval
 of 1, do some filtering and aggregation, and then print to executor logs.
 So it's not some 3rd party library that causes the 'leak'.

 Spark 1.3 is built by myself, with correct hadoop versions.

 Any ideas will be appreciated.

 Thanks.

 --
 Jerry





 --
 Jerry





-- 
Jerry


Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing

2015-05-27 Thread Ji ZHANG
Hi Akhil,

Thanks for your reply. Accoding to the Streaming tab of Web UI, the
Processing Time is around 400ms, and there's no Scheduling Delay, so I
suppose it's not the Kafka messages that eat up the off-heap memory. Or
maybe it is, but how to tell?

I googled about how to check the off-heap memory usage, there's a tool
called pmap, but I don't know how to interprete the results.

On Wed, May 27, 2015 at 3:08 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 After submitting the job, if you do a ps aux | grep spark-submit then you
 can see all JVM params. Are you using the highlevel consumer (receiver
 based) for receiving data from Kafka? In that case if your throughput is
 high and the processing delay exceeds batch interval then you will hit this
 memory issues as the data will keep on receiving and is dumped to memory.
 You can set StorageLevel to MEMORY_AND_DISK (but it slows things down).
 Another alternate will be to use the lowlevel kafka consumer
 https://github.com/dibbhatt/kafka-spark-consumer or to use the
 non-receiver based directStream
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
 that comes up with spark.

 Thanks
 Best Regards

 On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG zhangj...@gmail.com wrote:

 Hi,

 I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I find
 out that YARN is killing the driver and executor process because of
 excessive use of memory. Here's something I tried:

 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the
 extra memory is not used by heap.
 2. I set the two memoryOverhead params to 1024 (default is 384), but the
 memory just keeps growing and then hits the limit.
 3. This problem is not shown in low-throughput jobs, neither in
 standalone mode.
 4. The test job just receives messages from Kafka, with batch interval of
 1, do some filtering and aggregation, and then print to executor logs. So
 it's not some 3rd party library that causes the 'leak'.

 Spark 1.3 is built by myself, with correct hadoop versions.

 Any ideas will be appreciated.

 Thanks.

 --
 Jerry





-- 
Jerry