Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing
Hi, We switched from ParallelGC to CMS, and the symptom is gone. On Thu, Jun 4, 2015 at 3:37 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I set spark.shuffle.io.preferDirectBufs to false in SparkConf and this setting can be seen in web ui's environment tab. But, it still eats memory, i.e. -Xmx set to 512M but RES grows to 1.5G in half a day. On Wed, Jun 3, 2015 at 12:02 PM, Shixiong Zhu zsxw...@gmail.com wrote: Could you set spark.shuffle.io.preferDirectBufs to false to turn off the off-heap allocation of netty? Best Regards, Shixiong Zhu 2015-06-03 11:58 GMT+08:00 Ji ZHANG zhangj...@gmail.com: Hi, Thanks for you information. I'll give spark1.4 a try when it's released. On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das t...@databricks.com wrote: Could you try it out with Spark 1.4 RC3? Also pinging, Cloudera folks, they may be aware of something. BTW, the way I have debugged memory leaks in the past is as follows. Run with a small driver memory, say 1 GB. Periodically (maybe a script), take snapshots of histogram and also do memory dumps. Say every hour. And then compare the difference between two histo/dumps that are few hours separated (more the better). Diffing histo is easy. Diff two dumps can be done in JVisualVM, it will show the diff in the objects that got added in the later dump. That makes it easy to debug what is not getting cleaned. TD On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, Thanks for you reply. Here's the top 30 entries of jmap -histo:live result: num #instances #bytes class name -- 1: 40802 145083848 [B 2: 99264 12716112 methodKlass 3: 99264 12291480 constMethodKlass 4: 84729144816 constantPoolKlass 5: 84727625192 instanceKlassKlass 6: 1866097824 [Lscala.concurrent.forkjoin.ForkJoinTask; 7: 70454804832 constantPoolCacheKlass 8:1391684453376 java.util.HashMap$Entry 9: 94273542512 methodDataKlass 10:1413123391488 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry 11:1354913251784 java.lang.Long 12: 261922765496 [C 13: 8131140560 [Ljava.util.HashMap$Entry; 14: 89971061936 java.lang.Class 15: 16022 851384 [[I 16: 16447 789456 java.util.zip.Inflater 17: 13855 723376 [S 18: 17282 691280 java.lang.ref.Finalizer 19: 25725 617400 java.lang.String 20: 320 570368 [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry; 21: 16066 514112 java.util.concurrent.ConcurrentHashMap$HashEntry 22: 12288 491520 org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment 23: 13343 426976 java.util.concurrent.locks.ReentrantLock$NonfairSync 24: 12288 396416 [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry; 25: 16447 394728 java.util.zip.ZStreamRef 26: 565 370080 [I 27: 508 272288 objArrayKlassKlass 28: 16233 259728 java.lang.Object 29: 771 209232 [Ljava.util.concurrent.ConcurrentHashMap$HashEntry; 30: 2524 192312 [Ljava.lang.Object; But as I mentioned above, the heap memory seems OK, the extra memory is consumed by some off-heap data. I can't find a way to figure out what is in there. Besides, I did some extra experiments, i.e. run the same program in difference environments to test whether it has off-heap memory issue: spark1.0 + standalone = no spark1.0 + yarn = no spark1.3 + standalone = no spark1.3 + yarn = yes I'm using CDH5.1, so the spark1.0 is provided by cdh, and spark-1.3.1-bin-hadoop2.3 is downloaded from the official website. I could use spark1.0 + yarn, but I can't find a way to handle the logs, level and rolling, so it'll explode the harddrive. Currently I'll stick to spark1.0 + standalone, until our ops team decides to upgrade cdh. On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com wrote: While you are running is it possible for you login into the YARN node and get histograms of live objects using jmap -histo:live. That may reveal something. On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote: Hi, Unfortunately, they're still growing, both driver and executors. I run the same job with local mode, everything is fine. On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you replace your counting part with this? logs.filter(_.s_id 0).foreachRDD(rdd = logger.info(rdd.count())) Thanks Best Regards On Thu, May 28
Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing
Hi, I set spark.shuffle.io.preferDirectBufs to false in SparkConf and this setting can be seen in web ui's environment tab. But, it still eats memory, i.e. -Xmx set to 512M but RES grows to 1.5G in half a day. On Wed, Jun 3, 2015 at 12:02 PM, Shixiong Zhu zsxw...@gmail.com wrote: Could you set spark.shuffle.io.preferDirectBufs to false to turn off the off-heap allocation of netty? Best Regards, Shixiong Zhu 2015-06-03 11:58 GMT+08:00 Ji ZHANG zhangj...@gmail.com: Hi, Thanks for you information. I'll give spark1.4 a try when it's released. On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das t...@databricks.com wrote: Could you try it out with Spark 1.4 RC3? Also pinging, Cloudera folks, they may be aware of something. BTW, the way I have debugged memory leaks in the past is as follows. Run with a small driver memory, say 1 GB. Periodically (maybe a script), take snapshots of histogram and also do memory dumps. Say every hour. And then compare the difference between two histo/dumps that are few hours separated (more the better). Diffing histo is easy. Diff two dumps can be done in JVisualVM, it will show the diff in the objects that got added in the later dump. That makes it easy to debug what is not getting cleaned. TD On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, Thanks for you reply. Here's the top 30 entries of jmap -histo:live result: num #instances #bytes class name -- 1: 40802 145083848 [B 2: 99264 12716112 methodKlass 3: 99264 12291480 constMethodKlass 4: 84729144816 constantPoolKlass 5: 84727625192 instanceKlassKlass 6: 1866097824 [Lscala.concurrent.forkjoin.ForkJoinTask; 7: 70454804832 constantPoolCacheKlass 8:1391684453376 java.util.HashMap$Entry 9: 94273542512 methodDataKlass 10:1413123391488 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry 11:1354913251784 java.lang.Long 12: 261922765496 [C 13: 8131140560 [Ljava.util.HashMap$Entry; 14: 89971061936 java.lang.Class 15: 16022 851384 [[I 16: 16447 789456 java.util.zip.Inflater 17: 13855 723376 [S 18: 17282 691280 java.lang.ref.Finalizer 19: 25725 617400 java.lang.String 20: 320 570368 [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry; 21: 16066 514112 java.util.concurrent.ConcurrentHashMap$HashEntry 22: 12288 491520 org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment 23: 13343 426976 java.util.concurrent.locks.ReentrantLock$NonfairSync 24: 12288 396416 [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry; 25: 16447 394728 java.util.zip.ZStreamRef 26: 565 370080 [I 27: 508 272288 objArrayKlassKlass 28: 16233 259728 java.lang.Object 29: 771 209232 [Ljava.util.concurrent.ConcurrentHashMap$HashEntry; 30: 2524 192312 [Ljava.lang.Object; But as I mentioned above, the heap memory seems OK, the extra memory is consumed by some off-heap data. I can't find a way to figure out what is in there. Besides, I did some extra experiments, i.e. run the same program in difference environments to test whether it has off-heap memory issue: spark1.0 + standalone = no spark1.0 + yarn = no spark1.3 + standalone = no spark1.3 + yarn = yes I'm using CDH5.1, so the spark1.0 is provided by cdh, and spark-1.3.1-bin-hadoop2.3 is downloaded from the official website. I could use spark1.0 + yarn, but I can't find a way to handle the logs, level and rolling, so it'll explode the harddrive. Currently I'll stick to spark1.0 + standalone, until our ops team decides to upgrade cdh. On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com wrote: While you are running is it possible for you login into the YARN node and get histograms of live objects using jmap -histo:live. That may reveal something. On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote: Hi, Unfortunately, they're still growing, both driver and executors. I run the same job with local mode, everything is fine. On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you replace your counting part with this? logs.filter(_.s_id 0).foreachRDD(rdd = logger.info(rdd.count())) Thanks Best Regards On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I wrote a simple test job, it only does very basic operations. for example
Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing
Hi, Thanks for you information. I'll give spark1.4 a try when it's released. On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das t...@databricks.com wrote: Could you try it out with Spark 1.4 RC3? Also pinging, Cloudera folks, they may be aware of something. BTW, the way I have debugged memory leaks in the past is as follows. Run with a small driver memory, say 1 GB. Periodically (maybe a script), take snapshots of histogram and also do memory dumps. Say every hour. And then compare the difference between two histo/dumps that are few hours separated (more the better). Diffing histo is easy. Diff two dumps can be done in JVisualVM, it will show the diff in the objects that got added in the later dump. That makes it easy to debug what is not getting cleaned. TD On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, Thanks for you reply. Here's the top 30 entries of jmap -histo:live result: num #instances #bytes class name -- 1: 40802 145083848 [B 2: 99264 12716112 methodKlass 3: 99264 12291480 constMethodKlass 4: 84729144816 constantPoolKlass 5: 84727625192 instanceKlassKlass 6: 1866097824 [Lscala.concurrent.forkjoin.ForkJoinTask; 7: 70454804832 constantPoolCacheKlass 8:1391684453376 java.util.HashMap$Entry 9: 94273542512 methodDataKlass 10:1413123391488 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry 11:1354913251784 java.lang.Long 12: 261922765496 [C 13: 8131140560 [Ljava.util.HashMap$Entry; 14: 89971061936 java.lang.Class 15: 16022 851384 [[I 16: 16447 789456 java.util.zip.Inflater 17: 13855 723376 [S 18: 17282 691280 java.lang.ref.Finalizer 19: 25725 617400 java.lang.String 20: 320 570368 [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry; 21: 16066 514112 java.util.concurrent.ConcurrentHashMap$HashEntry 22: 12288 491520 org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment 23: 13343 426976 java.util.concurrent.locks.ReentrantLock$NonfairSync 24: 12288 396416 [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry; 25: 16447 394728 java.util.zip.ZStreamRef 26: 565 370080 [I 27: 508 272288 objArrayKlassKlass 28: 16233 259728 java.lang.Object 29: 771 209232 [Ljava.util.concurrent.ConcurrentHashMap$HashEntry; 30: 2524 192312 [Ljava.lang.Object; But as I mentioned above, the heap memory seems OK, the extra memory is consumed by some off-heap data. I can't find a way to figure out what is in there. Besides, I did some extra experiments, i.e. run the same program in difference environments to test whether it has off-heap memory issue: spark1.0 + standalone = no spark1.0 + yarn = no spark1.3 + standalone = no spark1.3 + yarn = yes I'm using CDH5.1, so the spark1.0 is provided by cdh, and spark-1.3.1-bin-hadoop2.3 is downloaded from the official website. I could use spark1.0 + yarn, but I can't find a way to handle the logs, level and rolling, so it'll explode the harddrive. Currently I'll stick to spark1.0 + standalone, until our ops team decides to upgrade cdh. On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com wrote: While you are running is it possible for you login into the YARN node and get histograms of live objects using jmap -histo:live. That may reveal something. On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote: Hi, Unfortunately, they're still growing, both driver and executors. I run the same job with local mode, everything is fine. On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you replace your counting part with this? logs.filter(_.s_id 0).foreachRDD(rdd = logger.info(rdd.count())) Thanks Best Regards On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I wrote a simple test job, it only does very basic operations. for example: val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic - 1)).map(_._2) val logs = lines.flatMap { line = try { Some(parse(line).extract[Impression]) } catch { case _: Exception = None } } logs.filter(_.s_id 0).count.foreachRDD { rdd = rdd.foreachPartition { iter = iter.foreach(count = logger.info(count.toString)) } } It receives messages from Kafka, parse the json, filter and count
Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing
Hi, Thanks for you reply. Here's the top 30 entries of jmap -histo:live result: num #instances #bytes class name -- 1: 40802 145083848 [B 2: 99264 12716112 methodKlass 3: 99264 12291480 constMethodKlass 4: 84729144816 constantPoolKlass 5: 84727625192 instanceKlassKlass 6: 1866097824 [Lscala.concurrent.forkjoin.ForkJoinTask; 7: 70454804832 constantPoolCacheKlass 8:1391684453376 java.util.HashMap$Entry 9: 94273542512 methodDataKlass 10:1413123391488 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry 11:1354913251784 java.lang.Long 12: 261922765496 [C 13: 8131140560 [Ljava.util.HashMap$Entry; 14: 89971061936 java.lang.Class 15: 16022 851384 [[I 16: 16447 789456 java.util.zip.Inflater 17: 13855 723376 [S 18: 17282 691280 java.lang.ref.Finalizer 19: 25725 617400 java.lang.String 20: 320 570368 [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry; 21: 16066 514112 java.util.concurrent.ConcurrentHashMap$HashEntry 22: 12288 491520 org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment 23: 13343 426976 java.util.concurrent.locks.ReentrantLock$NonfairSync 24: 12288 396416 [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry; 25: 16447 394728 java.util.zip.ZStreamRef 26: 565 370080 [I 27: 508 272288 objArrayKlassKlass 28: 16233 259728 java.lang.Object 29: 771 209232 [Ljava.util.concurrent.ConcurrentHashMap$HashEntry; 30: 2524 192312 [Ljava.lang.Object; But as I mentioned above, the heap memory seems OK, the extra memory is consumed by some off-heap data. I can't find a way to figure out what is in there. Besides, I did some extra experiments, i.e. run the same program in difference environments to test whether it has off-heap memory issue: spark1.0 + standalone = no spark1.0 + yarn = no spark1.3 + standalone = no spark1.3 + yarn = yes I'm using CDH5.1, so the spark1.0 is provided by cdh, and spark-1.3.1-bin-hadoop2.3 is downloaded from the official website. I could use spark1.0 + yarn, but I can't find a way to handle the logs, level and rolling, so it'll explode the harddrive. Currently I'll stick to spark1.0 + standalone, until our ops team decides to upgrade cdh. On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com wrote: While you are running is it possible for you login into the YARN node and get histograms of live objects using jmap -histo:live. That may reveal something. On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote: Hi, Unfortunately, they're still growing, both driver and executors. I run the same job with local mode, everything is fine. On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you replace your counting part with this? logs.filter(_.s_id 0).foreachRDD(rdd = logger.info(rdd.count())) Thanks Best Regards On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I wrote a simple test job, it only does very basic operations. for example: val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic - 1)).map(_._2) val logs = lines.flatMap { line = try { Some(parse(line).extract[Impression]) } catch { case _: Exception = None } } logs.filter(_.s_id 0).count.foreachRDD { rdd = rdd.foreachPartition { iter = iter.foreach(count = logger.info(count.toString)) } } It receives messages from Kafka, parse the json, filter and count the records, and then print it to logs. Thanks. On Thu, May 28, 2015 at 3:07 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Zhang, Could you paste your code in a gist? Not sure what you are doing inside the code to fill up memory. Thanks Best Regards On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG zhangj...@gmail.com wrote: Hi, Yes, I'm using createStream, but the storageLevel param is by default MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I don't think Kafka messages will be cached in driver. On Thu, May 28, 2015 at 12:24 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Are you using the createStream or createDirectStream api? If its the former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might slow things down though). Another way would be to try the later one. Thanks Best Regards On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG
Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing
Hi, I wrote a simple test job, it only does very basic operations. for example: val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic - 1)).map(_._2) val logs = lines.flatMap { line = try { Some(parse(line).extract[Impression]) } catch { case _: Exception = None } } logs.filter(_.s_id 0).count.foreachRDD { rdd = rdd.foreachPartition { iter = iter.foreach(count = logger.info(count.toString)) } } It receives messages from Kafka, parse the json, filter and count the records, and then print it to logs. Thanks. On Thu, May 28, 2015 at 3:07 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Zhang, Could you paste your code in a gist? Not sure what you are doing inside the code to fill up memory. Thanks Best Regards On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG zhangj...@gmail.com wrote: Hi, Yes, I'm using createStream, but the storageLevel param is by default MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I don't think Kafka messages will be cached in driver. On Thu, May 28, 2015 at 12:24 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Are you using the createStream or createDirectStream api? If its the former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might slow things down though). Another way would be to try the later one. Thanks Best Regards On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi Akhil, Thanks for your reply. Accoding to the Streaming tab of Web UI, the Processing Time is around 400ms, and there's no Scheduling Delay, so I suppose it's not the Kafka messages that eat up the off-heap memory. Or maybe it is, but how to tell? I googled about how to check the off-heap memory usage, there's a tool called pmap, but I don't know how to interprete the results. On Wed, May 27, 2015 at 3:08 PM, Akhil Das ak...@sigmoidanalytics.com wrote: After submitting the job, if you do a ps aux | grep spark-submit then you can see all JVM params. Are you using the highlevel consumer (receiver based) for receiving data from Kafka? In that case if your throughput is high and the processing delay exceeds batch interval then you will hit this memory issues as the data will keep on receiving and is dumped to memory. You can set StorageLevel to MEMORY_AND_DISK (but it slows things down). Another alternate will be to use the lowlevel kafka consumer https://github.com/dibbhatt/kafka-spark-consumer or to use the non-receiver based directStream https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers that comes up with spark. Thanks Best Regards On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I find out that YARN is killing the driver and executor process because of excessive use of memory. Here's something I tried: 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the extra memory is not used by heap. 2. I set the two memoryOverhead params to 1024 (default is 384), but the memory just keeps growing and then hits the limit. 3. This problem is not shown in low-throughput jobs, neither in standalone mode. 4. The test job just receives messages from Kafka, with batch interval of 1, do some filtering and aggregation, and then print to executor logs. So it's not some 3rd party library that causes the 'leak'. Spark 1.3 is built by myself, with correct hadoop versions. Any ideas will be appreciated. Thanks. -- Jerry -- Jerry -- Jerry -- Jerry
Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing
Hi, Unfortunately, they're still growing, both driver and executors. I run the same job with local mode, everything is fine. On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you replace your counting part with this? logs.filter(_.s_id 0).foreachRDD(rdd = logger.info(rdd.count())) Thanks Best Regards On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I wrote a simple test job, it only does very basic operations. for example: val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic - 1)).map(_._2) val logs = lines.flatMap { line = try { Some(parse(line).extract[Impression]) } catch { case _: Exception = None } } logs.filter(_.s_id 0).count.foreachRDD { rdd = rdd.foreachPartition { iter = iter.foreach(count = logger.info(count.toString)) } } It receives messages from Kafka, parse the json, filter and count the records, and then print it to logs. Thanks. On Thu, May 28, 2015 at 3:07 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Zhang, Could you paste your code in a gist? Not sure what you are doing inside the code to fill up memory. Thanks Best Regards On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG zhangj...@gmail.com wrote: Hi, Yes, I'm using createStream, but the storageLevel param is by default MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I don't think Kafka messages will be cached in driver. On Thu, May 28, 2015 at 12:24 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Are you using the createStream or createDirectStream api? If its the former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might slow things down though). Another way would be to try the later one. Thanks Best Regards On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi Akhil, Thanks for your reply. Accoding to the Streaming tab of Web UI, the Processing Time is around 400ms, and there's no Scheduling Delay, so I suppose it's not the Kafka messages that eat up the off-heap memory. Or maybe it is, but how to tell? I googled about how to check the off-heap memory usage, there's a tool called pmap, but I don't know how to interprete the results. On Wed, May 27, 2015 at 3:08 PM, Akhil Das ak...@sigmoidanalytics.com wrote: After submitting the job, if you do a ps aux | grep spark-submit then you can see all JVM params. Are you using the highlevel consumer (receiver based) for receiving data from Kafka? In that case if your throughput is high and the processing delay exceeds batch interval then you will hit this memory issues as the data will keep on receiving and is dumped to memory. You can set StorageLevel to MEMORY_AND_DISK (but it slows things down). Another alternate will be to use the lowlevel kafka consumer https://github.com/dibbhatt/kafka-spark-consumer or to use the non-receiver based directStream https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers that comes up with spark. Thanks Best Regards On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I find out that YARN is killing the driver and executor process because of excessive use of memory. Here's something I tried: 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the extra memory is not used by heap. 2. I set the two memoryOverhead params to 1024 (default is 384), but the memory just keeps growing and then hits the limit. 3. This problem is not shown in low-throughput jobs, neither in standalone mode. 4. The test job just receives messages from Kafka, with batch interval of 1, do some filtering and aggregation, and then print to executor logs. So it's not some 3rd party library that causes the 'leak'. Spark 1.3 is built by myself, with correct hadoop versions. Any ideas will be appreciated. Thanks. -- Jerry -- Jerry -- Jerry -- Jerry -- Jerry
Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing
Hi, Yes, I'm using createStream, but the storageLevel param is by default MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I don't think Kafka messages will be cached in driver. On Thu, May 28, 2015 at 12:24 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Are you using the createStream or createDirectStream api? If its the former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might slow things down though). Another way would be to try the later one. Thanks Best Regards On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi Akhil, Thanks for your reply. Accoding to the Streaming tab of Web UI, the Processing Time is around 400ms, and there's no Scheduling Delay, so I suppose it's not the Kafka messages that eat up the off-heap memory. Or maybe it is, but how to tell? I googled about how to check the off-heap memory usage, there's a tool called pmap, but I don't know how to interprete the results. On Wed, May 27, 2015 at 3:08 PM, Akhil Das ak...@sigmoidanalytics.com wrote: After submitting the job, if you do a ps aux | grep spark-submit then you can see all JVM params. Are you using the highlevel consumer (receiver based) for receiving data from Kafka? In that case if your throughput is high and the processing delay exceeds batch interval then you will hit this memory issues as the data will keep on receiving and is dumped to memory. You can set StorageLevel to MEMORY_AND_DISK (but it slows things down). Another alternate will be to use the lowlevel kafka consumer https://github.com/dibbhatt/kafka-spark-consumer or to use the non-receiver based directStream https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers that comes up with spark. Thanks Best Regards On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I find out that YARN is killing the driver and executor process because of excessive use of memory. Here's something I tried: 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the extra memory is not used by heap. 2. I set the two memoryOverhead params to 1024 (default is 384), but the memory just keeps growing and then hits the limit. 3. This problem is not shown in low-throughput jobs, neither in standalone mode. 4. The test job just receives messages from Kafka, with batch interval of 1, do some filtering and aggregation, and then print to executor logs. So it's not some 3rd party library that causes the 'leak'. Spark 1.3 is built by myself, with correct hadoop versions. Any ideas will be appreciated. Thanks. -- Jerry -- Jerry -- Jerry
Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing
Hi Akhil, Thanks for your reply. Accoding to the Streaming tab of Web UI, the Processing Time is around 400ms, and there's no Scheduling Delay, so I suppose it's not the Kafka messages that eat up the off-heap memory. Or maybe it is, but how to tell? I googled about how to check the off-heap memory usage, there's a tool called pmap, but I don't know how to interprete the results. On Wed, May 27, 2015 at 3:08 PM, Akhil Das ak...@sigmoidanalytics.com wrote: After submitting the job, if you do a ps aux | grep spark-submit then you can see all JVM params. Are you using the highlevel consumer (receiver based) for receiving data from Kafka? In that case if your throughput is high and the processing delay exceeds batch interval then you will hit this memory issues as the data will keep on receiving and is dumped to memory. You can set StorageLevel to MEMORY_AND_DISK (but it slows things down). Another alternate will be to use the lowlevel kafka consumer https://github.com/dibbhatt/kafka-spark-consumer or to use the non-receiver based directStream https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers that comes up with spark. Thanks Best Regards On Wed, May 27, 2015 at 11:51 AM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I'm using Spark Streaming 1.3 on CDH5.1 with yarn-cluster mode. I find out that YARN is killing the driver and executor process because of excessive use of memory. Here's something I tried: 1. Xmx is set to 512M and the GC looks fine (one ygc per 10s), so the extra memory is not used by heap. 2. I set the two memoryOverhead params to 1024 (default is 384), but the memory just keeps growing and then hits the limit. 3. This problem is not shown in low-throughput jobs, neither in standalone mode. 4. The test job just receives messages from Kafka, with batch interval of 1, do some filtering and aggregation, and then print to executor logs. So it's not some 3rd party library that causes the 'leak'. Spark 1.3 is built by myself, with correct hadoop versions. Any ideas will be appreciated. Thanks. -- Jerry -- Jerry
Re: Join DStream With Other Datasets
Hi Sean, Thanks for your advice, a normal 'val' will suffice. But will it be serialized and transferred every batch and every partition? That's why broadcast exists, right? For now I'm going to use 'val', but I'm still looking for a broadcast-way solution. On Sun, Jan 18, 2015 at 5:36 PM, Sean Owen so...@cloudera.com wrote: I think that this problem is not Spark-specific since you are simply side loading some data into memory. Therefore you do not need an answer that uses Spark. Simply load the data and then poll for an update each time it is accessed? Or some reasonable interval? This is just something you write in Java/Scala. On Jan 17, 2015 2:06 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I want to join a DStream with some other dataset, e.g. join a click stream with a spam ip list. I can think of two possible solutions, one is use broadcast variable, and the other is use transform operation as is described in the manual. But the problem is the spam ip list will be updated outside of the spark streaming program, so how can it be noticed to reload the list? For broadcast variables, they are immutable. For transform operation, is it costly to reload the RDD on every batch? If it is, and I use RDD.persist(), does it mean I need to launch a thread to regularly unpersist it so that it can get the updates? Any ideas will be appreciated. Thanks. -- Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Jerry
Re: Join DStream With Other Datasets
Hi, After some experiments, there're three methods that work in this 'join DStream with other dataset which is updated periodically'. 1. Create an RDD in transform operation val words = ssc.socketTextStream(localhost, ).flatMap(_.split(_)) val filtered = words transform { rdd = val spam = ssc.sparkContext.textFile(spam.txt).collect.toSet rdd.filter(!spam(_)) } The caveat is 'spam.txt' will be read in every batch. 2. Use variable broadcast variable... var bc = ssc.sparkContext.broadcast(getSpam) val filtered = words.filter(!bc.value(_)) val pool = Executors.newSingleThreadScheduledExecutor pool.scheduleAtFixedRate(new Runnable { def run(): Unit = { val obc = bc bc = ssc.sparkContext.broadcast(getSpam) obc.unpersist } }, 0, 5, TimeUnit.SECONDS) I'm surprised to come up with this solution, but I don't like var, and the unpersist thing looks evil. 3. Use accumulator val spam = ssc.sparkContext.accumulableCollection(getSpam.to[mutable.HashSet]) val filtered = words.filter(!spam.value(_)) def run(): Unit = { spam.setValue(getSpam.to[mutable.HashSet]) } Now it looks less ugly... Anyway, I still hope there's a better solution. On Sun, Jan 18, 2015 at 2:12 AM, Jörn Franke jornfra...@gmail.com wrote: Can't you send a special event through spark streaming once the list is updated? So you have your normal events and a special reload event Le 17 janv. 2015 15:06, Ji ZHANG zhangj...@gmail.com a écrit : Hi, I want to join a DStream with some other dataset, e.g. join a click stream with a spam ip list. I can think of two possible solutions, one is use broadcast variable, and the other is use transform operation as is described in the manual. But the problem is the spam ip list will be updated outside of the spark streaming program, so how can it be noticed to reload the list? For broadcast variables, they are immutable. For transform operation, is it costly to reload the RDD on every batch? If it is, and I use RDD.persist(), does it mean I need to launch a thread to regularly unpersist it so that it can get the updates? Any ideas will be appreciated. Thanks. -- Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Join DStream With Other Datasets
Hi, I want to join a DStream with some other dataset, e.g. join a click stream with a spam ip list. I can think of two possible solutions, one is use broadcast variable, and the other is use transform operation as is described in the manual. But the problem is the spam ip list will be updated outside of the spark streaming program, so how can it be noticed to reload the list? For broadcast variables, they are immutable. For transform operation, is it costly to reload the RDD on every batch? If it is, and I use RDD.persist(), does it mean I need to launch a thread to regularly unpersist it so that it can get the updates? Any ideas will be appreciated. Thanks. -- Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL 1.2 with CDH 4, Hive UDF is not working.
Hi, Recently I'm migrating from Shark 0.9 to Spark SQL 1.2, my CDH version is 4.5, Hive 0.11. I've managed to setup Spark SQL Thriftserver, and normal queries work fine, but custom UDF is not usable. The symptom is when executing CREATE TEMPORARY FUNCTION, the query hangs on a lock request: 14/12/22 14:41:57 DEBUG ClientCnxn: Reading reply sessionid:0x34a6121e6d93e74, packet:: clientPath:null serverPath:null finished:false header:: 289,8 replyHeader:: 289,51540866762,0 request:: '/hive_zookeeper_namespace_hive1/default,F response:: v{'sample_07,'LOCK-EXCLUSIVE-0001565612,'LOCK-EXCLUSIVE-0001565957} 14/12/22 14:41:57 ERROR ZooKeeperHiveLockManager: conflicting lock present for default mode EXCLUSIVE Is it a compatibility issue because Spark SQL 1.2 is based on Hive 0.13? Is there a workaround instead of upgrading CDH or forbidding UDF on Spark SQL? Thanks. -- Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Executor Log Rotation Is Not Working?
Hi, I figure out that in standalone mode these configuration should add to worker process's configs, like adding the following line in spark-env.sh: SPARK_WORKER_OPTS=-Dspark.executor.logs.rolling.strategy=time -Dspark.executor.logs.rolling.time.interval=daily -Dspark.executor.logs.rolling.maxRetainedFiles=3 Maybe in yarn mode the spark-defaults.conf would be sufficient, but I've not tested. On Tue, Nov 4, 2014 at 12:24 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I'm using Spark Streaming 1.1, and I have the following logs keep growing: /opt/spark-1.1.0-bin-cdh4/work/app-20141029175309-0005/2/stderr I think it is executor log, so I setup the following options in spark-defaults.conf: spark.executor.logs.rolling.strategy time spark.executor.logs.rolling.time.interval daily spark.executor.logs.rolling.maxRetainedFiles 10 I can see these options on Web UI, so I suppose they are effective. However, the stderr is still not rotated. Am I doing wrong? Thanks. -- Jerry -- Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Executor Log Rotation Is Not Working?
Hi, I'm using Spark Streaming 1.1, and I have the following logs keep growing: /opt/spark-1.1.0-bin-cdh4/work/app-20141029175309-0005/2/stderr I think it is executor log, so I setup the following options in spark-defaults.conf: spark.executor.logs.rolling.strategy time spark.executor.logs.rolling.time.interval daily spark.executor.logs.rolling.maxRetainedFiles 10 I can see these options on Web UI, so I suppose they are effective. However, the stderr is still not rotated. Am I doing wrong? Thanks. -- Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Implement Count by Minute in Spark Streaming
Hi, Suppose I have a stream of logs and I want to count them by minute. The result is like: 2014-10-26 18:38:00 100 2014-10-26 18:39:00 150 2014-10-26 18:40:00 200 One way to do this is to set the batch interval to 1 min, but each batch would be quite large. Or I can use updateStateByKey where key is like '2014-10-26 18:38:00', but I have two questions: 1. How to persist the result to MySQL? Do I need to flush them every batch? 2. How to delete the old state? For example, now is 18:50 but the 18:40's state is still in Spark. One solution is to set the key's state to None when there's no data of this key in this batch. But what if the log is not so much, and some batches get zero logs? For instance 18:40:00~18:40:10 has 10 logs - key 18:40's value is set to 10 18:40:10~18:40:20 has no log - key 18:40 is deleted 18:40:20~18:40:30 has 5 logs - key 18:40's value is set to 5 You can see the result is wrong. Maybe I can use an 'update' approach when flushing, i.e. check MySQL whether there's already an entry of 18:40 and add the result to that. But how about a unique count? I can't store all unique values in MySQL per se. So I'm looking for a better way to store count-by-minute result into rdbms (or nosql?). Any idea would be appreciated. Thanks. -- Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming: Calculate PV/UV by Minute and by Day?
Hi, I'm using Spark Streaming 1.0. Say I have a source of website click stream, like the following: ('2014-09-19 00:00:00', '192.168.1.1', 'home_page') ('2014-09-19 00:00:01', '192.168.1.2', 'list_page') ... And I want to calculate the page views (PV, number of logs) and unique user (UV, identified by IP) every minute, the result is like: ('2014-09-19 00:00:00', 'pv', 100) ('2014-09-19 00:00:00', 'uv', 50) ('2014-09-19 00:01:00', 'pv', 120) ('2014-09-19 00:01:00', 'uv', 60) Also, the total pv/uv by minute, like: ('2014-09-19 00:00:00', 'total pv', 100) ('2014-09-19 00:00:00', 'total uv', 50) ('2014-09-19 00:01:00', 'total pv', 220) // 100 + 120 ('2014-09-19 00:01:00', 'total uv', 80) // 50 unique users + 60 unique users - duplicate ones There are also some prerequisite: * The stream may not be fluent, so at 12:00 you may still receiving 11:55's messages. Put it in another way, this program should support both stream process and batch process, i.e. feed it with a whole day's data, it'll output the same result as streaming one. * The stream is partitioned, i.e. it may not be ordered. e.g. you may receive 12:00:00, 12:00:05, 12:00:04, 12:00:06, but the time difference shouldn't be too big. * The final result will be written to mysql, schema is (created datetime, category varchar(255), data bigint), just like the above result. For the 'every minute' one, I can use updateStateByKey, here's an example of calculating pv: (batch duration is 2 secs) val logsByMin = logs map { log = val date = new SimpleDateFormat(-MM-dd HH:mm:00).format(log.serverTime) date - 1L } val numLogsByMin = logsByMin.updateStateByKey((values: Seq[Long], state: Option[Long]) = { Some(state.getOrElse(0L) + values.sum) }) numLogsByMin foreach { rdd = savePv(rdd.collect) } This should meet the prerequisites, but with one major problem: the outdated key is not evicted. So I come up with an idea of expirable data - retain the calculated data for 2 minutes. Within the 2 minutes, flush them into mysql after every batch. The code is here: https://github.com/jizhang/spark-hello/blob/eb138e24b1e72e89bf3fa7e66c6ae7106853e5e8/src/main/scala/com/anjuke/dw/spark_hello/ActionLogProcessor.scala#L80 Maybe there' a better way to achieve this? As for the total pv/uv, I can set the key to date (e.g. '2014-09-19'), but how to save it to mysql every minute? Especially for uv, it cannot be summed, so I need to save it every minute, but how? Any ideas will be appreciated. Thanks. Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to Exclude Spark Dependencies from spark-streaming-kafka?
Hi, I'm developing an application with spark-streaming-kafka, which depends on spark-streaming and kafka. Since spark-streaming is provided in runtime, I want to exclude the jars from the assembly. I tried the following configuration: libraryDependencies ++= { val sparkVersion = 1.0.2 Seq( org.apache.spark %% spark-streaming-kafka % sparkVersion, org.apache.spark %% spark-streaming % sparkVersion % provided ) } Not working. I also tried: libraryDependencies ++= { val sparkVersion = 1.0.2 Seq( (org.apache.spark %% spark-streaming-kafka % sparkVersion). exclude(org.apache.spark, spark-streaming), org.apache.spark %% spark-streaming % sparkVersion % provided ) } It still package all the jars of spark-streaming. Finally I come up with: libraryDependencies ++= { val sparkVersion = 1.0.2 Seq( org.apache.spark %% spark-streaming-kafka % sparkVersion intransitive(), org.apache.spark %% spark-streaming % sparkVersion % provided, (org.apache.kafka %% kafka % 0.8.0). exclude(com.sun.jmx, jmxri). exclude(com.sun.jdmk, jmxtools). exclude(net.sf.jopt-simple, jopt-simple). exclude(org.slf4j, slf4j-simple). exclude(org.apache.zookeeper, zookeeper) ) } Wordy, but works. So I'm wondering whether there's a better way. Thanks. -- Jerry - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Do I Need to Set Checkpoint Interval for Every DStream?
Hi, I'm using spark streaming 1.0. I create dstream with kafkautils and apply some operations on it. There's a reduceByWindow operation at last so I suppose the checkpoint interval should be automatically set to more than 10 seconds. But what I see is it still checkpoint every 2 seconds (my batch interval), and from the log I see: [2014-09-17 16:43:25,096] INFO Checkpoint interval automatically set to 12000 ms (org.apache.spark.streaming.dstream.ReducedWindowedDStream) [2014-09-17 16:43:25,105] INFO Checkpoint interval = null (org.apache.spark.streaming.kafka.KafkaInputDStream) [2014-09-17 16:43:25,107] INFO Checkpoint interval = null (org.apache.spark.streaming.dstream.MappedDStream) [2014-09-17 16:43:25,108] INFO Checkpoint interval = null (org.apache.spark.streaming.dstream.MappedDStream) [2014-09-17 16:43:25,108] INFO Checkpoint interval = null (org.apache.spark.streaming.dstream.FilteredDStream) [2014-09-17 16:43:25,109] INFO Checkpoint interval = null (org.apache.spark.streaming.dstream.FlatMappedDStream) [2014-09-17 16:43:25,110] INFO Checkpoint interval = null (org.apache.spark.streaming.dstream.FlatMappedDStream) [2014-09-17 16:43:25,110] INFO Checkpoint interval = null (org.apache.spark.streaming.dstream.ShuffledDStream) [2014-09-17 16:43:25,111] INFO Checkpoint interval = 12000 ms (org.apache.spark.streaming.dstream.ReducedWindowedDStream) [2014-09-17 16:43:25,111] INFO Checkpoint interval = null (org.apache.spark.streaming.dstream.ForEachDStream) So does it mean I have to set checkpoint interval for all the dstreams? Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org