[jira] [Commented] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics

2017-06-18 Thread Deenbandhu Agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16053488#comment-16053488
 ] 

Deenbandhu Agarwal commented on SPARK-17381:


[~joaomaiaduarte] I am facing a similar kind of issue. I am running spark 
streaming in the production environment with 6 executors and 1 GB memory and 1 
core each and driver with 3 GB.Spark Version used is 2.0.1. Objects of some 
linked list are getting accumulated over the time in the JVM Heap of driver and 
after 2-3 hours the GC become very frequent and jobs starts queuing up. I tried 
your solution but in vain. We are not using linked list anywhere.  

> Memory leak  org.apache.spark.sql.execution.ui.SQLTaskMetrics
> -
>
> Key: SPARK-17381
> URL: https://issues.apache.org/jira/browse/SPARK-17381
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.0
> Environment: EMR 5.0.0 (submitted as yarn-client)
> Java Version  1.8.0_101 (Oracle Corporation)
> Scala Version version 2.11.8
> Problem also happens when I run locally with similar versions of java/scala. 
> OS: Ubuntu 16.04
>Reporter: Joao Duarte
>
> I am running a Spark Streaming application from a Kinesis stream. After some 
> hours running it gets out of memory. After a driver heap dump I found two 
> problems:
> 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems 
> this was a problem before: 
> https://issues.apache.org/jira/browse/SPARK-11192);
> To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just 
> needed to run the code below:
> {code}
> val dstream = ssc.union(kinesisStreams)
> dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => {
>   val toyDF = streamInfo.map(_ =>
> (1, "data","more data "
> ))
> .toDF("Num", "Data", "MoreData" )
>   toyDF.agg(sum("Num")).first().get(0)
> }
> )
> {code}
> 2) huge amount of Array[Byte] (9Gb+)
> After some analysis, I noticed that most of the Array[Byte] where being 
> referenced by objects that were being referenced by SQLTaskMetrics. The 
> strangest thing is that those Array[Byte] were basically text that were 
> loaded in the executors, so they should never be in the driver at all!
> Still could not replicate the 2nd problem with a simple code (the original 
> was complex with data coming from S3, DynamoDB and other databases). However, 
> when I debug the application I can see that in Executor.scala, during 
> reportHeartBeat(),  the data that should not be sent to the driver is being 
> added to "accumUpdates" which, as I understand, will be sent to the driver 
> for reporting.
> To be more precise, one of the taskRunner in the loop "for (taskRunner <- 
> runningTasks.values().asScala)"  contains a GenericInternalRow with a lot of 
> data that should not go to the driver. The path would be in my case: 
> taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if 
> not the same) to the data I see when I do a driver heap dump. 
> I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is 
> fixed I would have less of this undesirable data in the driver and I could 
> run my streaming app for a long period of time, but I think there will always 
> be some performance lost.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics

2016-09-06 Thread Davies Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15468738#comment-15468738
 ] 

Davies Liu commented on SPARK-17381:


cc [~cloud_fan]

> Memory leak  org.apache.spark.sql.execution.ui.SQLTaskMetrics
> -
>
> Key: SPARK-17381
> URL: https://issues.apache.org/jira/browse/SPARK-17381
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.0
> Environment: EMR 5.0.0 (submitted as yarn-client)
> Java Version  1.8.0_101 (Oracle Corporation)
> Scala Version version 2.11.8
> Problem also happens when I run locally with similar versions of java/scala. 
> OS: Ubuntu 16.04
>Reporter: Joao Duarte
>
> I am running a Spark Streaming application from a Kinesis stream. After some 
> hours running it gets out of memory. After a driver heap dump I found two 
> problems:
> 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems 
> this was a problem before: 
> https://issues.apache.org/jira/browse/SPARK-11192);
> To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just 
> needed to run the code below:
> {code}
> val dstream = ssc.union(kinesisStreams)
> dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => {
>   val toyDF = streamInfo.map(_ =>
> (1, "data","more data "
> ))
> .toDF("Num", "Data", "MoreData" )
>   toyDF.agg(sum("Num")).first().get(0)
> }
> )
> {code}
> 2) huge amount of Array[Byte] (9Gb+)
> After some analysis, I noticed that most of the Array[Byte] where being 
> referenced by objects that were being referenced by SQLTaskMetrics. The 
> strangest thing is that those Array[Byte] were basically text that were 
> loaded in the executors, so they should never be in the driver at all!
> Still could not replicate the 2nd problem with a simple code (the original 
> was complex with data coming from S3, DynamoDB and other databases). However, 
> when I debug the application I can see that in Executor.scala, during 
> reportHeartBeat(),  the data that should not be sent to the driver is being 
> added to "accumUpdates" which, as I understand, will be sent to the driver 
> for reporting.
> To be more precise, one of the taskRunner in the loop "for (taskRunner <- 
> runningTasks.values().asScala)"  contains a GenericInternalRow with a lot of 
> data that should not go to the driver. The path would be in my case: 
> taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if 
> not the same) to the data I see when I do a driver heap dump. 
> I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is 
> fixed I would have less of this undesirable data in the driver and I could 
> run my streaming app for a long period of time, but I think there will always 
> be some performance lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics

2016-09-06 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467322#comment-15467322
 ] 

Sean Owen commented on SPARK-17381:
---

Yeah, I didn't mean disable particular types of stats for particular columns as 
much as disable stats altogether. I'm not sure if even that's worth it.

There are probably ways to redesign your job to avoid a lot of this, mostly, by 
perhaps using fewer tasks because it sounds like you have a lot. Then again if 
you have thousands of tasks per stage but you have the parallelism to execute 
it all that way, that's probably not something you'd want to do. 

Consider whether you want to make entire HTML docs a single value of a single 
column? This alone might not be reason to reconsider it but it's possible there 
are other reasons this could improve your execution.

For really intense processing you may not even want to go through the DataFrame 
API and operate more directly using Datasets.

For such a huge job, collecting that much data sounds kind of reasonable, even. 
But you do need the driver to not retain so many executions that it runs out of 
memory.

> Memory leak  org.apache.spark.sql.execution.ui.SQLTaskMetrics
> -
>
> Key: SPARK-17381
> URL: https://issues.apache.org/jira/browse/SPARK-17381
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.0
> Environment: EMR 5.0.0 (submitted as yarn-client)
> Java Version  1.8.0_101 (Oracle Corporation)
> Scala Version version 2.11.8
> Problem also happens when I run locally with similar versions of java/scala. 
> OS: Ubuntu 16.04
>Reporter: Joao Duarte
>
> I am running a Spark Streaming application from a Kinesis stream. After some 
> hours running it gets out of memory. After a driver heap dump I found two 
> problems:
> 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems 
> this was a problem before: 
> https://issues.apache.org/jira/browse/SPARK-11192);
> To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just 
> needed to run the code below:
> {code}
> val dstream = ssc.union(kinesisStreams)
> dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => {
>   val toyDF = streamInfo.map(_ =>
> (1, "data","more data "
> ))
> .toDF("Num", "Data", "MoreData" )
>   toyDF.agg(sum("Num")).first().get(0)
> }
> )
> {code}
> 2) huge amount of Array[Byte] (9Gb+)
> After some analysis, I noticed that most of the Array[Byte] where being 
> referenced by objects that were being referenced by SQLTaskMetrics. The 
> strangest thing is that those Array[Byte] were basically text that were 
> loaded in the executors, so they should never be in the driver at all!
> Still could not replicate the 2nd problem with a simple code (the original 
> was complex with data coming from S3, DynamoDB and other databases). However, 
> when I debug the application I can see that in Executor.scala, during 
> reportHeartBeat(),  the data that should not be sent to the driver is being 
> added to "accumUpdates" which, as I understand, will be sent to the driver 
> for reporting.
> To be more precise, one of the taskRunner in the loop "for (taskRunner <- 
> runningTasks.values().asScala)"  contains a GenericInternalRow with a lot of 
> data that should not go to the driver. The path would be in my case: 
> taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if 
> not the same) to the data I see when I do a driver heap dump. 
> I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is 
> fixed I would have less of this undesirable data in the driver and I could 
> run my streaming app for a long period of time, but I think there will always 
> be some performance lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics

2016-09-06 Thread Joao Duarte (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467224#comment-15467224
 ] 

Joao Duarte commented on SPARK-17381:
-

Oh, I see. I'll change the Issue type from Bug to Improvement and leave the 
suggestion that there should be an option to disabling sending these string to 
the driver, or at least truncate them to a reasonable size.

Thanks again for your help!

> Memory leak  org.apache.spark.sql.execution.ui.SQLTaskMetrics
> -
>
> Key: SPARK-17381
> URL: https://issues.apache.org/jira/browse/SPARK-17381
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
> Environment: EMR 5.0.0 (submitted as yarn-client)
> Java Version  1.8.0_101 (Oracle Corporation)
> Scala Version version 2.11.8
> Problem also happens when I run locally with similar versions of java/scala. 
> OS: Ubuntu 16.04
>Reporter: Joao Duarte
>
> I am running a Spark Streaming application from a Kinesis stream. After some 
> hours running it gets out of memory. After a driver heap dump I found two 
> problems:
> 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems 
> this was a problem before: 
> https://issues.apache.org/jira/browse/SPARK-11192);
> To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just 
> needed to run the code below:
> {code}
> val dstream = ssc.union(kinesisStreams)
> dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => {
>   val toyDF = streamInfo.map(_ =>
> (1, "data","more data "
> ))
> .toDF("Num", "Data", "MoreData" )
>   toyDF.agg(sum("Num")).first().get(0)
> }
> )
> {code}
> 2) huge amount of Array[Byte] (9Gb+)
> After some analysis, I noticed that most of the Array[Byte] where being 
> referenced by objects that were being referenced by SQLTaskMetrics. The 
> strangest thing is that those Array[Byte] were basically text that were 
> loaded in the executors, so they should never be in the driver at all!
> Still could not replicate the 2nd problem with a simple code (the original 
> was complex with data coming from S3, DynamoDB and other databases). However, 
> when I debug the application I can see that in Executor.scala, during 
> reportHeartBeat(),  the data that should not be sent to the driver is being 
> added to "accumUpdates" which, as I understand, will be sent to the driver 
> for reporting.
> To be more precise, one of the taskRunner in the loop "for (taskRunner <- 
> runningTasks.values().asScala)"  contains a GenericInternalRow with a lot of 
> data that should not go to the driver. The path would be in my case: 
> taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if 
> not the same) to the data I see when I do a driver heap dump. 
> I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is 
> fixed I would have less of this undesirable data in the driver and I could 
> run my streaming app for a long period of time, but I think there will always 
> be some performance lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics

2016-09-06 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467108#comment-15467108
 ] 

Sean Owen commented on SPARK-17381:
---

The issue is that it's maintaining min/max stats for columns, including string 
columns. If your elements are huge strings of HTML, then this will 
unfortunately pull back at least two of them per task to the driver. I don't 
know of a way to disable that. You might consider whether you can tune the job 
a little bit to have fewer tasks? if it has unnecessarily thousands of them you 
might gain in several ways by tuning it down.

> Memory leak  org.apache.spark.sql.execution.ui.SQLTaskMetrics
> -
>
> Key: SPARK-17381
> URL: https://issues.apache.org/jira/browse/SPARK-17381
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
> Environment: EMR 5.0.0 (submitted as yarn-client)
> Java Version  1.8.0_101 (Oracle Corporation)
> Scala Version version 2.11.8
> Problem also happens when I run locally with similar versions of java/scala. 
> OS: Ubuntu 16.04
>Reporter: Joao Duarte
>
> I am running a Spark Streaming application from a Kinesis stream. After some 
> hours running it gets out of memory. After a driver heap dump I found two 
> problems:
> 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems 
> this was a problem before: 
> https://issues.apache.org/jira/browse/SPARK-11192);
> To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just 
> needed to run the code below:
> {code}
> val dstream = ssc.union(kinesisStreams)
> dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => {
>   val toyDF = streamInfo.map(_ =>
> (1, "data","more data "
> ))
> .toDF("Num", "Data", "MoreData" )
>   toyDF.agg(sum("Num")).first().get(0)
> }
> )
> {code}
> 2) huge amount of Array[Byte] (9Gb+)
> After some analysis, I noticed that most of the Array[Byte] where being 
> referenced by objects that were being referenced by SQLTaskMetrics. The 
> strangest thing is that those Array[Byte] were basically text that were 
> loaded in the executors, so they should never be in the driver at all!
> Still could not replicate the 2nd problem with a simple code (the original 
> was complex with data coming from S3, DynamoDB and other databases). However, 
> when I debug the application I can see that in Executor.scala, during 
> reportHeartBeat(),  the data that should not be sent to the driver is being 
> added to "accumUpdates" which, as I understand, will be sent to the driver 
> for reporting.
> To be more precise, one of the taskRunner in the loop "for (taskRunner <- 
> runningTasks.values().asScala)"  contains a GenericInternalRow with a lot of 
> data that should not go to the driver. The path would be in my case: 
> taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if 
> not the same) to the data I see when I do a driver heap dump. 
> I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is 
> fixed I would have less of this undesirable data in the driver and I could 
> run my streaming app for a long period of time, but I think there will always 
> be some performance lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics

2016-09-06 Thread Joao Duarte (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467102#comment-15467102
 ] 

Joao Duarte commented on SPARK-17381:
-

Well, the application is stable after 24h+ (and running). If it is normal that 
9GB of HTML pages processed by executors are sent to the driver in about two 
hours the problem is solved.
Thank you for the suggestions. Should I close the issue?

> Memory leak  org.apache.spark.sql.execution.ui.SQLTaskMetrics
> -
>
> Key: SPARK-17381
> URL: https://issues.apache.org/jira/browse/SPARK-17381
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
> Environment: EMR 5.0.0 (submitted as yarn-client)
> Java Version  1.8.0_101 (Oracle Corporation)
> Scala Version version 2.11.8
> Problem also happens when I run locally with similar versions of java/scala. 
> OS: Ubuntu 16.04
>Reporter: Joao Duarte
>
> I am running a Spark Streaming application from a Kinesis stream. After some 
> hours running it gets out of memory. After a driver heap dump I found two 
> problems:
> 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems 
> this was a problem before: 
> https://issues.apache.org/jira/browse/SPARK-11192);
> To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just 
> needed to run the code below:
> {code}
> val dstream = ssc.union(kinesisStreams)
> dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => {
>   val toyDF = streamInfo.map(_ =>
> (1, "data","more data "
> ))
> .toDF("Num", "Data", "MoreData" )
>   toyDF.agg(sum("Num")).first().get(0)
> }
> )
> {code}
> 2) huge amount of Array[Byte] (9Gb+)
> After some analysis, I noticed that most of the Array[Byte] where being 
> referenced by objects that were being referenced by SQLTaskMetrics. The 
> strangest thing is that those Array[Byte] were basically text that were 
> loaded in the executors, so they should never be in the driver at all!
> Still could not replicate the 2nd problem with a simple code (the original 
> was complex with data coming from S3, DynamoDB and other databases). However, 
> when I debug the application I can see that in Executor.scala, during 
> reportHeartBeat(),  the data that should not be sent to the driver is being 
> added to "accumUpdates" which, as I understand, will be sent to the driver 
> for reporting.
> To be more precise, one of the taskRunner in the loop "for (taskRunner <- 
> runningTasks.values().asScala)"  contains a GenericInternalRow with a lot of 
> data that should not go to the driver. The path would be in my case: 
> taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if 
> not the same) to the data I see when I do a driver heap dump. 
> I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is 
> fixed I would have less of this undesirable data in the driver and I could 
> run my streaming app for a long period of time, but I think there will always 
> be some performance lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics

2016-09-05 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464744#comment-15464744
 ] 

Sean Owen commented on SPARK-17381:
---

That's it then. It's 'normal' operation, but does mean you need much bigger 
driver memory if you want to store state for all of these stages and tasks (or 
else keep fewer of them).

No, it's not data that's somehow only supposed to be on the executor. I 
commented on this above. 

> Memory leak  org.apache.spark.sql.execution.ui.SQLTaskMetrics
> -
>
> Key: SPARK-17381
> URL: https://issues.apache.org/jira/browse/SPARK-17381
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
> Environment: EMR 5.0.0 (submitted as yarn-client)
> Java Version  1.8.0_101 (Oracle Corporation)
> Scala Version version 2.11.8
> Problem also happens when I run locally with similar versions of java/scala. 
> OS: Ubuntu 16.04
>Reporter: Joao Duarte
>
> I am running a Spark Streaming application from a Kinesis stream. After some 
> hours running it gets out of memory. After a driver heap dump I found two 
> problems:
> 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems 
> this was a problem before: 
> https://issues.apache.org/jira/browse/SPARK-11192);
> To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just 
> needed to run the code below:
> {code}
> val dstream = ssc.union(kinesisStreams)
> dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => {
>   val toyDF = streamInfo.map(_ =>
> (1, "data","more data "
> ))
> .toDF("Num", "Data", "MoreData" )
>   toyDF.agg(sum("Num")).first().get(0)
> }
> )
> {code}
> 2) huge amount of Array[Byte] (9Gb+)
> After some analysis, I noticed that most of the Array[Byte] where being 
> referenced by objects that were being referenced by SQLTaskMetrics. The 
> strangest thing is that those Array[Byte] were basically text that were 
> loaded in the executors, so they should never be in the driver at all!
> Still could not replicate the 2nd problem with a simple code (the original 
> was complex with data coming from S3, DynamoDB and other databases). However, 
> when I debug the application I can see that in Executor.scala, during 
> reportHeartBeat(),  the data that should not be sent to the driver is being 
> added to "accumUpdates" which, as I understand, will be sent to the driver 
> for reporting.
> To be more precise, one of the taskRunner in the loop "for (taskRunner <- 
> runningTasks.values().asScala)"  contains a GenericInternalRow with a lot of 
> data that should not go to the driver. The path would be in my case: 
> taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if 
> not the same) to the data I see when I do a driver heap dump. 
> I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is 
> fixed I would have less of this undesirable data in the driver and I could 
> run my streaming app for a long period of time, but I think there will always 
> be some performance lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics

2016-09-05 Thread Joao Duarte (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464740#comment-15464740
 ] 

Joao Duarte commented on SPARK-17381:
-

Hi Sean,

Thank you for your suggestion! Setting spark.sql.ui.retainedExecutions to a low 
number seems to be a good workaround. I'm running my application for about one 
hour with spark.sql.ui.retainedExecutions=10 and the number of SQLTaskMetrics 
objects and the heap memory size of the driver seem to stabilise. I'll give an 
update at the end of the day or tomorrow to tell you if it remains stable.
However, I think it is really strange that the driver is sent data that are 
supposed to be only in the executors. In my case, I am parsing HTML pages and 
some of those are being sent to the driver as part of ColumnStats (as you 
referred in you previous comment). Are they being sent as a summary by mistake? 
Do Spark really need this kind of information? The work around enables my 
application to run but sending unneeded data to the driver certainly reduces 
performance (some HTML pages I parse can be really big). 

Cheers

> Memory leak  org.apache.spark.sql.execution.ui.SQLTaskMetrics
> -
>
> Key: SPARK-17381
> URL: https://issues.apache.org/jira/browse/SPARK-17381
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
> Environment: EMR 5.0.0 (submitted as yarn-client)
> Java Version  1.8.0_101 (Oracle Corporation)
> Scala Version version 2.11.8
> Problem also happens when I run locally with similar versions of java/scala. 
> OS: Ubuntu 16.04
>Reporter: Joao Duarte
>
> I am running a Spark Streaming application from a Kinesis stream. After some 
> hours running it gets out of memory. After a driver heap dump I found two 
> problems:
> 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems 
> this was a problem before: 
> https://issues.apache.org/jira/browse/SPARK-11192);
> To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just 
> needed to run the code below:
> {code}
> val dstream = ssc.union(kinesisStreams)
> dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => {
>   val toyDF = streamInfo.map(_ =>
> (1, "data","more data "
> ))
> .toDF("Num", "Data", "MoreData" )
>   toyDF.agg(sum("Num")).first().get(0)
> }
> )
> {code}
> 2) huge amount of Array[Byte] (9Gb+)
> After some analysis, I noticed that most of the Array[Byte] where being 
> referenced by objects that were being referenced by SQLTaskMetrics. The 
> strangest thing is that those Array[Byte] were basically text that were 
> loaded in the executors, so they should never be in the driver at all!
> Still could not replicate the 2nd problem with a simple code (the original 
> was complex with data coming from S3, DynamoDB and other databases). However, 
> when I debug the application I can see that in Executor.scala, during 
> reportHeartBeat(),  the data that should not be sent to the driver is being 
> added to "accumUpdates" which, as I understand, will be sent to the driver 
> for reporting.
> To be more precise, one of the taskRunner in the loop "for (taskRunner <- 
> runningTasks.values().asScala)"  contains a GenericInternalRow with a lot of 
> data that should not go to the driver. The path would be in my case: 
> taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if 
> not the same) to the data I see when I do a driver heap dump. 
> I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is 
> fixed I would have less of this undesirable data in the driver and I could 
> run my streaming app for a long period of time, but I think there will always 
> be some performance lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics

2016-09-02 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15459422#comment-15459422
 ] 

Sean Owen commented on SPARK-17381:
---

The only thing i can think of that accumulates row-like data are the 
ColumnStats of an InMemoryRelation. But it's just accumulating things like 
max/min and count. You could see a row with a string in there but that much is 
probably normal.

What happens if you set spark.sql.ui.retainedExecutions to something low like 
10? As far as I can see it's normal to retain this data about executions, but, 
if you have lots of stages with lots of tasks then this could become huge fast.

> Memory leak  org.apache.spark.sql.execution.ui.SQLTaskMetrics
> -
>
> Key: SPARK-17381
> URL: https://issues.apache.org/jira/browse/SPARK-17381
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
> Environment: EMR 5.0.0 (submitted as yarn-client)
> Java Version  1.8.0_101 (Oracle Corporation)
> Scala Version version 2.11.8
> Problem also happens when I run locally with similar versions of java/scala. 
> OS: Ubuntu 16.04
>Reporter: Joao Duarte
>
> I am running a Spark Streaming application from a Kinesis stream. After some 
> hours running it gets out of memory. After a driver heap dump I found two 
> problems:
> 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems 
> this was a problem before: 
> https://issues.apache.org/jira/browse/SPARK-11192);
> To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just 
> needed to run the code below:
> {code}
> val dstream = ssc.union(kinesisStreams)
> dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => {
>   //load data
>   val toyDF = streamInfo.map(_ =>
> (1, "data","more data "
> ))
> .toDF("Num", "Data", "MoreData" )
>   toyDF.agg(sum("Num")).first().get(0)
> }
> )
> {code}
> 2) huge amount of Array[Byte] (9Gb+)
> After some analysis, I noticed that most of the Array[Byte] where being 
> referenced by objects that were being referenced by SQLTaskMetrics. The 
> strangest thing is that those Array[Byte] were basically text that were 
> loaded in the executors, so they should never be in the driver at all!
> Still could not replicate the 2nd problem with a simple code (the original 
> was complex with data coming from S3, DynamoDB and other databases). However, 
> when I debug the application I can see that in Executor.scala, during 
> reportHeartBeat(),  the data that should not be sent to the driver is being 
> added to "accumUpdates" which, as I understand, will be sent to the driver 
> for reporting.
> To be more precise, one of the taskRunner in the loop "for (taskRunner <- 
> runningTasks.values().asScala)"  contains a GenericInternalRow with a lot of 
> data that should not go to the driver. The path would be in my case: 
> taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if 
> not the same) to the data I see when I do a driver heap dump. 
> I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is 
> fixed I would have less of this undesirable data in the driver and I could 
> run my streaming app for a long period of time, but I think there will always 
> be some performance lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics

2016-09-02 Thread Joao Duarte (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15459021#comment-15459021
 ] 

Joao Duarte commented on SPARK-17381:
-

Hi Sean.

Thanks for commenting. I set to Blocker because I can't run the app in a 
production environment.

I am not explicitly adding accumulators in my code. I am trying to replicate 
the issue with a simpler code such that I can post it here, but was unable to 
do that so far.
Basically, for each row of the Dstream I collect a lot of data from different 
sources, transform the data into a dataframe and run some ML Pipelines. Again, 
I don't explicitly collect data or use accumulators. I started stripping down 
the code by removing all ML Pipelines and now I only have some functions that I 
run in RDD maps to collect the data I need (so, it runs in the executors), 
transform that data, create a dataframe and the perform an aggregation (just a 
dummy sum of a column). When I inspect the drivers heap, some data I loaded in 
the executors are there.

The driver's heap "path" that contains the "unwanted" data is 
objSQLTaskMetrics.accumulatorUpdates[2]._2 where:
-objSQLTaskMetrics is one of the several SQLTaskMetrics
- _2 class is Collections$UnmodifiableRandomAccessList with size 1 which 
contains the GenericInternalRow with the "unwanted" data.

> Memory leak  org.apache.spark.sql.execution.ui.SQLTaskMetrics
> -
>
> Key: SPARK-17381
> URL: https://issues.apache.org/jira/browse/SPARK-17381
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
> Environment: EMR 5.0.0 (submitted as yarn-client)
> Java Version  1.8.0_101 (Oracle Corporation)
> Scala Version version 2.11.8
> Problem also happens when I run locally with similar versions of java/scala. 
> OS: Ubuntu 16.04
>Reporter: Joao Duarte
>
> I am running a Spark Streaming application from a Kinesis stream. After some 
> hours running it gets out of memory. After a driver heap dump I found two 
> problems:
> 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems 
> this was a problem before: 
> https://issues.apache.org/jira/browse/SPARK-11192);
> To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just 
> needed to run the code below:
> {code}
> val dstream = ssc.union(kinesisStreams)
> dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => {
>   //load data
>   val toyDF = streamInfo.map(_ =>
> (1, "data","more data "
> ))
> .toDF("Num", "Data", "MoreData" )
>   toyDF.agg(sum("Num")).first().get(0)
> }
> )
> {code}
> 2) huge amount of Array[Byte] (9Gb+)
> After some analysis, I noticed that most of the Array[Byte] where being 
> referenced by objects that were being referenced by SQLTaskMetrics. The 
> strangest thing is that those Array[Byte] were basically text that were 
> loaded in the executors, so they should never be in the driver at all!
> Still could not replicate the 2nd problem with a simple code (the original 
> was complex with data coming from S3, DynamoDB and other databases). However, 
> when I debug the application I can see that in Executor.scala, during 
> reportHeartBeat(),  the data that should not be sent to the driver is being 
> added to "accumUpdates" which, as I understand, will be sent to the driver 
> for reporting.
> To be more precise, one of the taskRunner in the loop "for (taskRunner <- 
> runningTasks.values().asScala)"  contains a GenericInternalRow with a lot of 
> data that should not go to the driver. The path would be in my case: 
> taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if 
> not the same) to the data I see when I do a driver heap dump. 
> I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is 
> fixed I would have less of this undesirable data in the driver and I could 
> run my streaming app for a long period of time, but I think there will always 
> be some performance lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org