[jira] [Commented] (SPARK-24189) Spark Strcutured Streaming not working with the Kafka Transactions
[ https://issues.apache.org/jira/browse/SPARK-24189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16602889#comment-16602889 ] Binzi Cao commented on SPARK-24189: --- It seems I'm hitting a similar issuel. I managed to set the kafka isolation level with {code:java} .option("kafka.isolation.level", "read_committed") {code} and using {code:java} kafka-client 1.0.0 {code} and I'm seeing this issue: {code:java} [error] org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 17 in stage 0.0 failed 1 times, most recent failure: Lost task 17.0 in stage 0.0 (TID 17, localhost, executor driver): java.util.concurrent.TimeoutException: Cannot fetch record for offset 145 in 2000 milliseconds [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:271) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147) [error] at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147) [error] at org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54) [error] at org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362) [error] at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:152) [error] at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:143) [error] at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) [error] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) [error] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) [error] at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:205) [error] at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) [error] at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) [error] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) [error] at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(generated.java:230) {code} So it looks like it is not working with a topic with kafka transactions at all. The exception was thrown here: https://github.com/apache/spark/blob/v2.3.1/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala#L271-L272 Setting {code:java} failOnDataLoss=false {code} can't fix the issue, as the exception is never caught in the KafkaDataConsumer.scala code. > Spark Strcutured Streaming not working with the Kafka Transactions > -- > > Key: SPARK-24189 > URL: https://issues.apache.org/jira/browse/SPARK-24189 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: bharath kumar avusherla >Priority: Major > > Was trying to read kafka transactional topic using Spark Structured Streaming > 2.3.0 with the kafka option isolation-level = "read_committed", but spark > reading the data immediately without waiting for the data in topic to be > committed. In spark documentation it was mentioned as Structured Streaming > supports Kafka version 0.10 or higher. I am using below command to test the > scenario. > val df = spark > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", "localhost:9092") > .option("subscribe", "test-topic") > .option("isolation-level","read_committed") > .load() > Can you please let me know if the transactional read is supported in SPark > 2.3.0 strcutured Streaming or am i missing anything. > > Thank you. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24189) Spark Strcutured Streaming not working with the Kafka Transactions
[ https://issues.apache.org/jira/browse/SPARK-24189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16602889#comment-16602889 ] Binzi Cao edited comment on SPARK-24189 at 9/4/18 10:49 AM: It seems I'm hitting a similar issue. I managed to set the kafka isolation level with {code:java} .option("kafka.isolation.level", "read_committed") {code} and using {code:java} kafka-client 1.0.0 Spark version: 2.3.1{code} and I'm seeing this issue: {code:java} [error] org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 17 in stage 0.0 failed 1 times, most recent failure: Lost task 17.0 in stage 0.0 (TID 17, localhost, executor driver): java.util.concurrent.TimeoutException: Cannot fetch record for offset 145 in 2000 milliseconds [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:271) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147) [error] at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147) [error] at org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54) [error] at org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362) [error] at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:152) [error] at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:143) [error] at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) [error] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) [error] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) [error] at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:205) [error] at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) [error] at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) [error] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) [error] at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(generated.java:230) {code} So it looks like it is not working with a topic with kafka transactions at all. The exception was thrown here: [https://github.com/apache/spark/blob/v2.3.1/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala#L271-L272] Setting {code:java} failOnDataLoss=false {code} can't fix the issue, as the exception is never caught in the KafkaDataConsumer.scala code. was (Author: caobinzi): It seems I'm hitting a similar issuel. I managed to set the kafka isolation level with {code:java} .option("kafka.isolation.level", "read_committed") {code} and using {code:java} kafka-client 1.0.0 Spark version: 2.3.1{code} and I'm seeing this issue: {code:java} [error] org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 17 in stage 0.0 failed 1 times, most recent failure: Lost task 17.0 in stage 0.0 (TID 17, localhost, executor driver): java.util.concurrent.TimeoutException: Cannot fetch record for offset 145 in 2000 milliseconds [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:271) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147) [error] at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147) [error] at org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54) [error] at org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362) [error] a
[jira] [Comment Edited] (SPARK-24189) Spark Strcutured Streaming not working with the Kafka Transactions
[ https://issues.apache.org/jira/browse/SPARK-24189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16602889#comment-16602889 ] Binzi Cao edited comment on SPARK-24189 at 9/4/18 10:49 AM: It seems I'm hitting a similar issuel. I managed to set the kafka isolation level with {code:java} .option("kafka.isolation.level", "read_committed") {code} and using {code:java} kafka-client 1.0.0 Spark version: 2.3.1{code} and I'm seeing this issue: {code:java} [error] org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 17 in stage 0.0 failed 1 times, most recent failure: Lost task 17.0 in stage 0.0 (TID 17, localhost, executor driver): java.util.concurrent.TimeoutException: Cannot fetch record for offset 145 in 2000 milliseconds [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:271) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147) [error] at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147) [error] at org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54) [error] at org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362) [error] at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:152) [error] at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:143) [error] at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) [error] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) [error] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) [error] at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:205) [error] at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) [error] at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) [error] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) [error] at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(generated.java:230) {code} So it looks like it is not working with a topic with kafka transactions at all. The exception was thrown here: [https://github.com/apache/spark/blob/v2.3.1/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala#L271-L272] Setting {code:java} failOnDataLoss=false {code} can't fix the issue, as the exception is never caught in the KafkaDataConsumer.scala code. was (Author: caobinzi): It seems I'm hitting a similar issuel. I managed to set the kafka isolation level with {code:java} .option("kafka.isolation.level", "read_committed") {code} and using {code:java} kafka-client 1.0.0 {code} and I'm seeing this issue: {code:java} [error] org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 17 in stage 0.0 failed 1 times, most recent failure: Lost task 17.0 in stage 0.0 (TID 17, localhost, executor driver): java.util.concurrent.TimeoutException: Cannot fetch record for offset 145 in 2000 milliseconds [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:271) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147) [error] at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109) [error] at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147) [error] at org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54) [error] at org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362) [error] at org.apach
[jira] [Created] (SPARK-20760) Memory Leak of RDD blocks
Binzi Cao created SPARK-20760: - Summary: Memory Leak of RDD blocks Key: SPARK-20760 URL: https://issues.apache.org/jira/browse/SPARK-20760 Project: Spark Issue Type: Bug Components: Block Manager Affects Versions: 2.1.0 Environment: Spark 2.1.0 Reporter: Binzi Cao Priority: Critical Memory lead for RDD blocks for a long time running rdd process. I have a long term running application, which is doing caculations of RDDs. and I found the RDD blocks are keep increasing. The rdd blocks and memory usage does not mach the cached rdds and memory. It looks like spark keeps old rdds in memory and never released it. In addtion, I'm not seeing this issue in spark 1.6. The below is the minimized code and it is reproducible by justing running it in local mode. Scala file: import scala.concurrent.duration.Duration import scala.util.{Try, Failure, Success} import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import scala.concurrent._ import ExecutionContext.Implicits.global case class Person(id: String, name: String) object RDDApp { def run(sc: SparkContext) = { while (true) { val r = scala.util.Random val data = (1 to r.nextInt(100)).toList.map { a => Person(a.toString, a.toString) } val rdd = sc.parallelize(data) rdd.cache println("running") val a = (1 to 100).toList.map { x => Future(rdd.filter(_.id == x.toString).collect) } a.foreach { f => println(Await.ready(f, Duration.Inf).value.get) } rdd.unpersist() } } def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("test") val sc = new SparkContext(conf) run(sc) } } build sbt file: name := "RDDTest" version := "0.1.1" scalaVersion := "2.11.5" libraryDependencies ++= Seq ( "org.scalaz" %% "scalaz-core" % "7.2.0", "org.scalaz" %% "scalaz-concurrent" % "7.2.0", "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" ) addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") mainClass in assembly := Some("RDDApp") test in assembly := {} To reproduce it: Just spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ --executor-memory 4G \ --executor-cores 1 \ --num-executors 1 \ --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks
[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Binzi Cao updated SPARK-20760: -- Description: Memory leak for RDD blocks for a long time running rdd process. I have a long term running application, which is doing computations of RDDs. and I found the RDD blocks are keep increasing in the spark ui page. The rdd blocks and memory usage does not mach the cached rdds and memory. It looks like spark keeps old rdd in memory and never released it or never got a chance to release it. The job will eventually die of out of memory. In addition, I'm not seeing this issue in spark 1.6. The below is the minimized code and it is reproducible by justing running it in local mode. Scala file: {{{ import scala.concurrent.duration.Duration import scala.util.{Try, Failure, Success} import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import scala.concurrent._ import ExecutionContext.Implicits.global case class Person(id: String, name: String) object RDDApp { def run(sc: SparkContext) = { while (true) { val r = scala.util.Random val data = (1 to r.nextInt(100)).toList.map { a => Person(a.toString, a.toString) } val rdd = sc.parallelize(data) rdd.cache println("running") val a = (1 to 100).toList.map { x => Future(rdd.filter(_.id == x.toString).collect) } a.foreach { f => println(Await.ready(f, Duration.Inf).value.get) } rdd.unpersist() } } def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("test") val sc = new SparkContext(conf) run(sc) } } }}} build sbt file: name := "RDDTest" version := "0.1.1" scalaVersion := "2.11.5" libraryDependencies ++= Seq ( "org.scalaz" %% "scalaz-core" % "7.2.0", "org.scalaz" %% "scalaz-concurrent" % "7.2.0", "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" ) addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") mainClass in assembly := Some("RDDApp") test in assembly := {} To reproduce it: Just spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ --executor-memory 4G \ --executor-cores 1 \ --num-executors 1 \ --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar was: Memory lead for RDD blocks for a long time running rdd process. I have a long term running application, which is doing caculations of RDDs. and I found the RDD blocks are keep increasing. The rdd blocks and memory usage does not mach the cached rdds and memory. It looks like spark keeps old rdds in memory and never released it. In addtion, I'm not seeing this issue in spark 1.6. The below is the minimized code and it is reproducible by justing running it in local mode. Scala file: import scala.concurrent.duration.Duration import scala.util.{Try, Failure, Success} import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import scala.concurrent._ import ExecutionContext.Implicits.global case class Person(id: String, name: String) object RDDApp { def run(sc: SparkContext) = { while (true) { val r = scala.util.Random val data = (1 to r.nextInt(100)).toList.map { a => Person(a.toString, a.toString) } val rdd = sc.parallelize(data) rdd.cache println("running") val a = (1 to 100).toList.map { x => Future(rdd.filter(_.id == x.toString).collect) } a.foreach { f => println(Await.ready(f, Duration.Inf).value.get) } rdd.unpersist() } } def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("test") val sc = new SparkContext(conf) run(sc) } } build sbt file: name := "RDDTest" version := "0.1.1" scalaVersion := "2.11.5" libraryDependencies ++= Seq ( "org.scalaz" %% "scalaz-core" % "7.2.0", "org.scalaz" %% "scalaz-concurrent" % "7.2.0", "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" ) addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") mainClass in assembly := Some("RDDApp") test in assembly := {} To reproduce it: Just spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ --executor-memory 4G \ --executor-cores 1 \ --num-executors 1 \ --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > Memory Leak of RDD blocks > -- > > Key: SPARK-20760 > URL: https://issues.apache.org/jira/browse/SPARK-20760 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 2.1.0 > Environment: Spark 2.1.0 >Reporter: Binzi Cao >Priority:
[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks
[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Binzi Cao updated SPARK-20760: -- Description: Memory leak for RDD blocks for a long time running rdd process. I have a long term running application, which is doing computations of RDDs. and I found the RDD blocks are keep increasing in the spark ui page. The rdd blocks and memory usage does not mach the cached rdds and memory. It looks like spark keeps old rdd in memory and never released it or never got a chance to release it. The job will eventually die of out of memory. In addition, I'm not seeing this issue in spark 1.6. The below is the minimized code and it is reproducible by justing running it in local mode. Scala file: {code} import scala.concurrent.duration.Duration import scala.util.{Try, Failure, Success} import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import scala.concurrent._ import ExecutionContext.Implicits.global case class Person(id: String, name: String) object RDDApp { def run(sc: SparkContext) = { while (true) { val r = scala.util.Random val data = (1 to r.nextInt(100)).toList.map { a => Person(a.toString, a.toString) } val rdd = sc.parallelize(data) rdd.cache println("running") val a = (1 to 100).toList.map { x => Future(rdd.filter(_.id == x.toString).collect) } a.foreach { f => println(Await.ready(f, Duration.Inf).value.get) } rdd.unpersist() } } def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("test") val sc = new SparkContext(conf) run(sc) } } {code} build sbt file: name := "RDDTest" version := "0.1.1" scalaVersion := "2.11.5" libraryDependencies ++= Seq ( "org.scalaz" %% "scalaz-core" % "7.2.0", "org.scalaz" %% "scalaz-concurrent" % "7.2.0", "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" ) addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") mainClass in assembly := Some("RDDApp") test in assembly := {} To reproduce it: Just spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ --executor-memory 4G \ --executor-cores 1 \ --num-executors 1 \ --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar was: Memory leak for RDD blocks for a long time running rdd process. I have a long term running application, which is doing computations of RDDs. and I found the RDD blocks are keep increasing in the spark ui page. The rdd blocks and memory usage does not mach the cached rdds and memory. It looks like spark keeps old rdd in memory and never released it or never got a chance to release it. The job will eventually die of out of memory. In addition, I'm not seeing this issue in spark 1.6. The below is the minimized code and it is reproducible by justing running it in local mode. Scala file: {{{ import scala.concurrent.duration.Duration import scala.util.{Try, Failure, Success} import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import scala.concurrent._ import ExecutionContext.Implicits.global case class Person(id: String, name: String) object RDDApp { def run(sc: SparkContext) = { while (true) { val r = scala.util.Random val data = (1 to r.nextInt(100)).toList.map { a => Person(a.toString, a.toString) } val rdd = sc.parallelize(data) rdd.cache println("running") val a = (1 to 100).toList.map { x => Future(rdd.filter(_.id == x.toString).collect) } a.foreach { f => println(Await.ready(f, Duration.Inf).value.get) } rdd.unpersist() } } def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("test") val sc = new SparkContext(conf) run(sc) } } }}} build sbt file: name := "RDDTest" version := "0.1.1" scalaVersion := "2.11.5" libraryDependencies ++= Seq ( "org.scalaz" %% "scalaz-core" % "7.2.0", "org.scalaz" %% "scalaz-concurrent" % "7.2.0", "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" ) addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") mainClass in assembly := Some("RDDApp") test in assembly := {} To reproduce it: Just spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ --executor-memory 4G \ --executor-cores 1 \ --num-executors 1 \ --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > Memory Leak of RDD blocks > -- > > Key: SPARK-20760 > URL: https://issues.apache.org/jira/browse/SPARK-20760 > Project: Spark > Issue Type: Bug > Components: Block Manag
[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks
[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Binzi Cao updated SPARK-20760: -- Description: Memory leak for RDD blocks for a long time running rdd process. I have a long term running application, which is doing computations of RDDs. and I found the RDD blocks are keep increasing in the spark ui page. The rdd blocks and memory usage does not mach the cached rdds and memory. It looks like spark keeps old rdd in memory and never released it or never got a chance to release it. The job will eventually die of out of memory. In addition, I'm not seeing this issue in spark 1.6. The below is the minimized code and it is reproducible by justing running it in local mode. Scala file: {code} import scala.concurrent.duration.Duration import scala.util.{Try, Failure, Success} import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import scala.concurrent._ import ExecutionContext.Implicits.global case class Person(id: String, name: String) object RDDApp { def run(sc: SparkContext) = { while (true) { val r = scala.util.Random val data = (1 to r.nextInt(100)).toList.map { a => Person(a.toString, a.toString) } val rdd = sc.parallelize(data) rdd.cache println("running") val a = (1 to 100).toList.map { x => Future(rdd.filter(_.id == x.toString).collect) } a.foreach { f => println(Await.ready(f, Duration.Inf).value.get) } rdd.unpersist() } } def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("test") val sc = new SparkContext(conf) run(sc) } } {code} build sbt file: {code} name := "RDDTest" version := "0.1.1" scalaVersion := "2.11.5" libraryDependencies ++= Seq ( "org.scalaz" %% "scalaz-core" % "7.2.0", "org.scalaz" %% "scalaz-concurrent" % "7.2.0", "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" ) addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") mainClass in assembly := Some("RDDApp") test in assembly := {} {code} To reproduce it: Just {code} spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ --executor-memory 4G \ --executor-cores 1 \ --num-executors 1 \ --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar {code} was: Memory leak for RDD blocks for a long time running rdd process. I have a long term running application, which is doing computations of RDDs. and I found the RDD blocks are keep increasing in the spark ui page. The rdd blocks and memory usage does not mach the cached rdds and memory. It looks like spark keeps old rdd in memory and never released it or never got a chance to release it. The job will eventually die of out of memory. In addition, I'm not seeing this issue in spark 1.6. The below is the minimized code and it is reproducible by justing running it in local mode. Scala file: {code} import scala.concurrent.duration.Duration import scala.util.{Try, Failure, Success} import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import scala.concurrent._ import ExecutionContext.Implicits.global case class Person(id: String, name: String) object RDDApp { def run(sc: SparkContext) = { while (true) { val r = scala.util.Random val data = (1 to r.nextInt(100)).toList.map { a => Person(a.toString, a.toString) } val rdd = sc.parallelize(data) rdd.cache println("running") val a = (1 to 100).toList.map { x => Future(rdd.filter(_.id == x.toString).collect) } a.foreach { f => println(Await.ready(f, Duration.Inf).value.get) } rdd.unpersist() } } def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("test") val sc = new SparkContext(conf) run(sc) } } {code} build sbt file: name := "RDDTest" version := "0.1.1" scalaVersion := "2.11.5" libraryDependencies ++= Seq ( "org.scalaz" %% "scalaz-core" % "7.2.0", "org.scalaz" %% "scalaz-concurrent" % "7.2.0", "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" ) addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") mainClass in assembly := Some("RDDApp") test in assembly := {} To reproduce it: Just spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ --executor-memory 4G \ --executor-cores 1 \ --num-executors 1 \ --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > Memory Leak of RDD blocks > -- > > Key: SPARK-20760 > URL: https://issues.apache.org/jira/browse/SPARK-20760 > Project: Spark > Issue Type: Bug >
[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks
[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Binzi Cao updated SPARK-20760: -- Attachment: Screen Shot 2017-05-16 at 1.47.06 pm.png RDD blocks are growing crazily after running for a couple of hours > Memory Leak of RDD blocks > -- > > Key: SPARK-20760 > URL: https://issues.apache.org/jira/browse/SPARK-20760 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 2.1.0 > Environment: Spark 2.1.0 >Reporter: Binzi Cao >Priority: Critical > Attachments: Screen Shot 2017-05-16 at 1.47.06 pm.png > > > Memory leak for RDD blocks for a long time running rdd process. > I have a long term running application, which is doing computations of RDDs. > and I found the RDD blocks are keep increasing in the spark ui page. The rdd > blocks and memory usage does not mach the cached rdds and memory. It looks > like spark keeps old rdd in memory and never released it or never got a > chance to release it. The job will eventually die of out of memory. > In addition, I'm not seeing this issue in spark 1.6. > The below is the minimized code and it is reproducible by justing running it > in local mode. > Scala file: > {code} > import scala.concurrent.duration.Duration > import scala.util.{Try, Failure, Success} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.rdd.RDD > import scala.concurrent._ > import ExecutionContext.Implicits.global > case class Person(id: String, name: String) > object RDDApp { > def run(sc: SparkContext) = { > while (true) { > val r = scala.util.Random > val data = (1 to r.nextInt(100)).toList.map { a => > Person(a.toString, a.toString) > } > val rdd = sc.parallelize(data) > rdd.cache > println("running") > val a = (1 to 100).toList.map { x => > Future(rdd.filter(_.id == x.toString).collect) > } > a.foreach { f => > println(Await.ready(f, Duration.Inf).value.get) > } > rdd.unpersist() > } > } > def main(args: Array[String]): Unit = { >val conf = new SparkConf().setAppName("test") > val sc = new SparkContext(conf) > run(sc) > } > } > {code} > build sbt file: > {code} > name := "RDDTest" > version := "0.1.1" > scalaVersion := "2.11.5" > libraryDependencies ++= Seq ( > "org.scalaz" %% "scalaz-core" % "7.2.0", > "org.scalaz" %% "scalaz-concurrent" % "7.2.0", > "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", > "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" > ) > addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") > mainClass in assembly := Some("RDDApp") > test in assembly := {} > {code} > To reproduce it: > Just > {code} > spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ > --executor-memory 4G \ > --executor-cores 1 \ > --num-executors 1 \ > --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks
[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Binzi Cao updated SPARK-20760: -- Attachment: (was: Screen Shot 2017-05-16 at 1.47.06 pm.png) > Memory Leak of RDD blocks > -- > > Key: SPARK-20760 > URL: https://issues.apache.org/jira/browse/SPARK-20760 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 2.1.0 > Environment: Spark 2.1.0 >Reporter: Binzi Cao >Priority: Critical > > Memory leak for RDD blocks for a long time running rdd process. > I have a long term running application, which is doing computations of RDDs. > and I found the RDD blocks are keep increasing in the spark ui page. The rdd > blocks and memory usage does not mach the cached rdds and memory. It looks > like spark keeps old rdd in memory and never released it or never got a > chance to release it. The job will eventually die of out of memory. > In addition, I'm not seeing this issue in spark 1.6. > The below is the minimized code and it is reproducible by justing running it > in local mode. > Scala file: > {code} > import scala.concurrent.duration.Duration > import scala.util.{Try, Failure, Success} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.rdd.RDD > import scala.concurrent._ > import ExecutionContext.Implicits.global > case class Person(id: String, name: String) > object RDDApp { > def run(sc: SparkContext) = { > while (true) { > val r = scala.util.Random > val data = (1 to r.nextInt(100)).toList.map { a => > Person(a.toString, a.toString) > } > val rdd = sc.parallelize(data) > rdd.cache > println("running") > val a = (1 to 100).toList.map { x => > Future(rdd.filter(_.id == x.toString).collect) > } > a.foreach { f => > println(Await.ready(f, Duration.Inf).value.get) > } > rdd.unpersist() > } > } > def main(args: Array[String]): Unit = { >val conf = new SparkConf().setAppName("test") > val sc = new SparkContext(conf) > run(sc) > } > } > {code} > build sbt file: > {code} > name := "RDDTest" > version := "0.1.1" > scalaVersion := "2.11.5" > libraryDependencies ++= Seq ( > "org.scalaz" %% "scalaz-core" % "7.2.0", > "org.scalaz" %% "scalaz-concurrent" % "7.2.0", > "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", > "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" > ) > addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") > mainClass in assembly := Some("RDDApp") > test in assembly := {} > {code} > To reproduce it: > Just > {code} > spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ > --executor-memory 4G \ > --executor-cores 1 \ > --num-executors 1 \ > --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks
[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Binzi Cao updated SPARK-20760: -- Attachment: RDD Blocks .png RDD blocks are increasing crazily after running the app for a couple of hours, see the attached screen shot of the spark ui page > Memory Leak of RDD blocks > -- > > Key: SPARK-20760 > URL: https://issues.apache.org/jira/browse/SPARK-20760 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 2.1.0 > Environment: Spark 2.1.0 >Reporter: Binzi Cao >Priority: Critical > Attachments: RDD Blocks .png > > > Memory leak for RDD blocks for a long time running rdd process. > I have a long term running application, which is doing computations of RDDs. > and I found the RDD blocks are keep increasing in the spark ui page. The rdd > blocks and memory usage does not mach the cached rdds and memory. It looks > like spark keeps old rdd in memory and never released it or never got a > chance to release it. The job will eventually die of out of memory. > In addition, I'm not seeing this issue in spark 1.6. > The below is the minimized code and it is reproducible by justing running it > in local mode. > Scala file: > {code} > import scala.concurrent.duration.Duration > import scala.util.{Try, Failure, Success} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.rdd.RDD > import scala.concurrent._ > import ExecutionContext.Implicits.global > case class Person(id: String, name: String) > object RDDApp { > def run(sc: SparkContext) = { > while (true) { > val r = scala.util.Random > val data = (1 to r.nextInt(100)).toList.map { a => > Person(a.toString, a.toString) > } > val rdd = sc.parallelize(data) > rdd.cache > println("running") > val a = (1 to 100).toList.map { x => > Future(rdd.filter(_.id == x.toString).collect) > } > a.foreach { f => > println(Await.ready(f, Duration.Inf).value.get) > } > rdd.unpersist() > } > } > def main(args: Array[String]): Unit = { >val conf = new SparkConf().setAppName("test") > val sc = new SparkContext(conf) > run(sc) > } > } > {code} > build sbt file: > {code} > name := "RDDTest" > version := "0.1.1" > scalaVersion := "2.11.5" > libraryDependencies ++= Seq ( > "org.scalaz" %% "scalaz-core" % "7.2.0", > "org.scalaz" %% "scalaz-concurrent" % "7.2.0", > "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", > "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" > ) > addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") > mainClass in assembly := Some("RDDApp") > test in assembly := {} > {code} > To reproduce it: > Just > {code} > spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ > --executor-memory 4G \ > --executor-cores 1 \ > --num-executors 1 \ > --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-20760) Memory Leak of RDD blocks
[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Binzi Cao updated SPARK-20760: -- Comment: was deleted (was: RDD blocks are growing crazily after running for a couple of hours) > Memory Leak of RDD blocks > -- > > Key: SPARK-20760 > URL: https://issues.apache.org/jira/browse/SPARK-20760 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 2.1.0 > Environment: Spark 2.1.0 >Reporter: Binzi Cao >Priority: Critical > Attachments: RDD Blocks .png > > > Memory leak for RDD blocks for a long time running rdd process. > I have a long term running application, which is doing computations of RDDs. > and I found the RDD blocks are keep increasing in the spark ui page. The rdd > blocks and memory usage does not mach the cached rdds and memory. It looks > like spark keeps old rdd in memory and never released it or never got a > chance to release it. The job will eventually die of out of memory. > In addition, I'm not seeing this issue in spark 1.6. > The below is the minimized code and it is reproducible by justing running it > in local mode. > Scala file: > {code} > import scala.concurrent.duration.Duration > import scala.util.{Try, Failure, Success} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.rdd.RDD > import scala.concurrent._ > import ExecutionContext.Implicits.global > case class Person(id: String, name: String) > object RDDApp { > def run(sc: SparkContext) = { > while (true) { > val r = scala.util.Random > val data = (1 to r.nextInt(100)).toList.map { a => > Person(a.toString, a.toString) > } > val rdd = sc.parallelize(data) > rdd.cache > println("running") > val a = (1 to 100).toList.map { x => > Future(rdd.filter(_.id == x.toString).collect) > } > a.foreach { f => > println(Await.ready(f, Duration.Inf).value.get) > } > rdd.unpersist() > } > } > def main(args: Array[String]): Unit = { >val conf = new SparkConf().setAppName("test") > val sc = new SparkContext(conf) > run(sc) > } > } > {code} > build sbt file: > {code} > name := "RDDTest" > version := "0.1.1" > scalaVersion := "2.11.5" > libraryDependencies ++= Seq ( > "org.scalaz" %% "scalaz-core" % "7.2.0", > "org.scalaz" %% "scalaz-concurrent" % "7.2.0", > "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", > "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" > ) > addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") > mainClass in assembly := Some("RDDApp") > test in assembly := {} > {code} > To reproduce it: > Just > {code} > spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ > --executor-memory 4G \ > --executor-cores 1 \ > --num-executors 1 \ > --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks
[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Binzi Cao updated SPARK-20760: -- Description: Memory leak for RDD blocks for a long time running rdd process. We have a long term running application, which is doing computations of RDDs. and we found the RDD blocks are keep increasing in the spark ui page. The rdd blocks and memory usage do not mach the cached rdds and memory. It looks like spark keeps old rdd in memory and never released it or never got a chance to release it. The job will eventually die of out of memory. In addition, I'm not seeing this issue in spark 1.6. We are seeing the same issue in Yarn Cluster mode both in kafka streaming and batch applications. The issue in streaming is similar, however, it seems the rdd blocks grows a bit slower than batch jobs. The below is the sample code and it is reproducible by justing running it in local mode. Scala file: {code} import scala.concurrent.duration.Duration import scala.util.{Try, Failure, Success} import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import scala.concurrent._ import ExecutionContext.Implicits.global case class Person(id: String, name: String) object RDDApp { def run(sc: SparkContext) = { while (true) { val r = scala.util.Random val data = (1 to r.nextInt(100)).toList.map { a => Person(a.toString, a.toString) } val rdd = sc.parallelize(data) rdd.cache println("running") val a = (1 to 100).toList.map { x => Future(rdd.filter(_.id == x.toString).collect) } a.foreach { f => println(Await.ready(f, Duration.Inf).value.get) } rdd.unpersist() } } def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("test") val sc = new SparkContext(conf) run(sc) } } {code} build sbt file: {code} name := "RDDTest" version := "0.1.1" scalaVersion := "2.11.5" libraryDependencies ++= Seq ( "org.scalaz" %% "scalaz-core" % "7.2.0", "org.scalaz" %% "scalaz-concurrent" % "7.2.0", "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" ) addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") mainClass in assembly := Some("RDDApp") test in assembly := {} {code} To reproduce it: Just {code} spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ --executor-memory 4G \ --executor-cores 1 \ --num-executors 1 \ --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar {code} was: Memory leak for RDD blocks for a long time running rdd process. I have a long term running application, which is doing computations of RDDs. and I found the RDD blocks are keep increasing in the spark ui page. The rdd blocks and memory usage does not mach the cached rdds and memory. It looks like spark keeps old rdd in memory and never released it or never got a chance to release it. The job will eventually die of out of memory. In addition, I'm not seeing this issue in spark 1.6. The below is the minimized code and it is reproducible by justing running it in local mode. Scala file: {code} import scala.concurrent.duration.Duration import scala.util.{Try, Failure, Success} import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import scala.concurrent._ import ExecutionContext.Implicits.global case class Person(id: String, name: String) object RDDApp { def run(sc: SparkContext) = { while (true) { val r = scala.util.Random val data = (1 to r.nextInt(100)).toList.map { a => Person(a.toString, a.toString) } val rdd = sc.parallelize(data) rdd.cache println("running") val a = (1 to 100).toList.map { x => Future(rdd.filter(_.id == x.toString).collect) } a.foreach { f => println(Await.ready(f, Duration.Inf).value.get) } rdd.unpersist() } } def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("test") val sc = new SparkContext(conf) run(sc) } } {code} build sbt file: {code} name := "RDDTest" version := "0.1.1" scalaVersion := "2.11.5" libraryDependencies ++= Seq ( "org.scalaz" %% "scalaz-core" % "7.2.0", "org.scalaz" %% "scalaz-concurrent" % "7.2.0", "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" ) addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") mainClass in assembly := Some("RDDApp") test in assembly := {} {code} To reproduce it: Just {code} spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ --executor-memory 4G \ --executor-cores 1 \ --num-executors 1 \ --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar {code
[jira] [Commented] (SPARK-20760) Memory Leak of RDD blocks
[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16012161#comment-16012161 ] Binzi Cao commented on SPARK-20760: --- [~srowen] Thank you for the update. Some more informations 1. The issue did not happen in spark 1.6. 2. The rdd cache and unpersist are in the same thread, only the computations are in other threads. The future/await is just to simulate a production application. The unpersist call is after all the computations done, so there are no multiple cache/unpersist running in multiple threads 3. Each cached and unpersist are for different rdds with different ids, there are not multithread running to cache or unpersist same rdd. > Memory Leak of RDD blocks > -- > > Key: SPARK-20760 > URL: https://issues.apache.org/jira/browse/SPARK-20760 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 2.1.0 > Environment: Spark 2.1.0 >Reporter: Binzi Cao > Attachments: RDD Blocks .png > > > Memory leak for RDD blocks for a long time running rdd process. > We have a long term running application, which is doing computations of > RDDs. and we found the RDD blocks are keep increasing in the spark ui page. > The rdd blocks and memory usage do not mach the cached rdds and memory. It > looks like spark keeps old rdd in memory and never released it or never got a > chance to release it. The job will eventually die of out of memory. > In addition, I'm not seeing this issue in spark 1.6. We are seeing the same > issue in Yarn Cluster mode both in kafka streaming and batch applications. > The issue in streaming is similar, however, it seems the rdd blocks grows a > bit slower than batch jobs. > The below is the sample code and it is reproducible by justing running it in > local mode. > Scala file: > {code} > import scala.concurrent.duration.Duration > import scala.util.{Try, Failure, Success} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.rdd.RDD > import scala.concurrent._ > import ExecutionContext.Implicits.global > case class Person(id: String, name: String) > object RDDApp { > def run(sc: SparkContext) = { > while (true) { > val r = scala.util.Random > val data = (1 to r.nextInt(100)).toList.map { a => > Person(a.toString, a.toString) > } > val rdd = sc.parallelize(data) > rdd.cache > println("running") > val a = (1 to 100).toList.map { x => > Future(rdd.filter(_.id == x.toString).collect) > } > a.foreach { f => > println(Await.ready(f, Duration.Inf).value.get) > } > rdd.unpersist() > } > } > def main(args: Array[String]): Unit = { >val conf = new SparkConf().setAppName("test") > val sc = new SparkContext(conf) > run(sc) > } > } > {code} > build sbt file: > {code} > name := "RDDTest" > version := "0.1.1" > scalaVersion := "2.11.5" > libraryDependencies ++= Seq ( > "org.scalaz" %% "scalaz-core" % "7.2.0", > "org.scalaz" %% "scalaz-concurrent" % "7.2.0", > "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", > "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" > ) > addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") > mainClass in assembly := Some("RDDApp") > test in assembly := {} > {code} > To reproduce it: > Just > {code} > spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ > --executor-memory 4G \ > --executor-cores 1 \ > --num-executors 1 \ > --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-20760) Memory Leak of RDD blocks
[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16012161#comment-16012161 ] Binzi Cao edited comment on SPARK-20760 at 5/16/17 11:19 AM: - [~srowen] Thank you for the update. Some more informations 1. The issue did not happen in spark 1.6. 2. The rdd cache and unpersist are in the same thread, only the computations are in other threads. The future/await is just to simulate a production application. The unpersist call is after all the computations done. So there is a bug in unpersist? 3. Each cached and unpersist are for different rdds with different ids, there are not multithread running to cache or unpersist same rdd. Is it because each computation will cause an rdd cache seperately and the rdd will be cached with multiple copies? That mean rdd is not thread-safe any more ? was (Author: caobinzi): [~srowen] Thank you for the update. Some more informations 1. The issue did not happen in spark 1.6. 2. The rdd cache and unpersist are in the same thread, only the computations are in other threads. The future/await is just to simulate a production application. The unpersist call is after all the computations done, so there are no multiple cache/unpersist running in multiple threads 3. Each cached and unpersist are for different rdds with different ids, there are not multithread running to cache or unpersist same rdd. > Memory Leak of RDD blocks > -- > > Key: SPARK-20760 > URL: https://issues.apache.org/jira/browse/SPARK-20760 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 2.1.0 > Environment: Spark 2.1.0 >Reporter: Binzi Cao > Attachments: RDD Blocks .png > > > Memory leak for RDD blocks for a long time running rdd process. > We have a long term running application, which is doing computations of > RDDs. and we found the RDD blocks are keep increasing in the spark ui page. > The rdd blocks and memory usage do not mach the cached rdds and memory. It > looks like spark keeps old rdd in memory and never released it or never got a > chance to release it. The job will eventually die of out of memory. > In addition, I'm not seeing this issue in spark 1.6. We are seeing the same > issue in Yarn Cluster mode both in kafka streaming and batch applications. > The issue in streaming is similar, however, it seems the rdd blocks grows a > bit slower than batch jobs. > The below is the sample code and it is reproducible by justing running it in > local mode. > Scala file: > {code} > import scala.concurrent.duration.Duration > import scala.util.{Try, Failure, Success} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.rdd.RDD > import scala.concurrent._ > import ExecutionContext.Implicits.global > case class Person(id: String, name: String) > object RDDApp { > def run(sc: SparkContext) = { > while (true) { > val r = scala.util.Random > val data = (1 to r.nextInt(100)).toList.map { a => > Person(a.toString, a.toString) > } > val rdd = sc.parallelize(data) > rdd.cache > println("running") > val a = (1 to 100).toList.map { x => > Future(rdd.filter(_.id == x.toString).collect) > } > a.foreach { f => > println(Await.ready(f, Duration.Inf).value.get) > } > rdd.unpersist() > } > } > def main(args: Array[String]): Unit = { >val conf = new SparkConf().setAppName("test") > val sc = new SparkContext(conf) > run(sc) > } > } > {code} > build sbt file: > {code} > name := "RDDTest" > version := "0.1.1" > scalaVersion := "2.11.5" > libraryDependencies ++= Seq ( > "org.scalaz" %% "scalaz-core" % "7.2.0", > "org.scalaz" %% "scalaz-concurrent" % "7.2.0", > "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", > "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" > ) > addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") > mainClass in assembly := Some("RDDApp") > test in assembly := {} > {code} > To reproduce it: > Just > {code} > spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ > --executor-memory 4G \ > --executor-cores 1 \ > --num-executors 1 \ > --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20760) Memory Leak of RDD blocks
[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16012176#comment-16012176 ] Binzi Cao commented on SPARK-20760: --- Ah, I see. I did something like rdd.count to trigger the cache first before the computations, however, that did not work. Is there a workaround for this? > Memory Leak of RDD blocks > -- > > Key: SPARK-20760 > URL: https://issues.apache.org/jira/browse/SPARK-20760 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 2.1.0 > Environment: Spark 2.1.0 >Reporter: Binzi Cao > Attachments: RDD Blocks .png > > > Memory leak for RDD blocks for a long time running rdd process. > We have a long term running application, which is doing computations of > RDDs. and we found the RDD blocks are keep increasing in the spark ui page. > The rdd blocks and memory usage do not mach the cached rdds and memory. It > looks like spark keeps old rdd in memory and never released it or never got a > chance to release it. The job will eventually die of out of memory. > In addition, I'm not seeing this issue in spark 1.6. We are seeing the same > issue in Yarn Cluster mode both in kafka streaming and batch applications. > The issue in streaming is similar, however, it seems the rdd blocks grows a > bit slower than batch jobs. > The below is the sample code and it is reproducible by justing running it in > local mode. > Scala file: > {code} > import scala.concurrent.duration.Duration > import scala.util.{Try, Failure, Success} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.rdd.RDD > import scala.concurrent._ > import ExecutionContext.Implicits.global > case class Person(id: String, name: String) > object RDDApp { > def run(sc: SparkContext) = { > while (true) { > val r = scala.util.Random > val data = (1 to r.nextInt(100)).toList.map { a => > Person(a.toString, a.toString) > } > val rdd = sc.parallelize(data) > rdd.cache > println("running") > val a = (1 to 100).toList.map { x => > Future(rdd.filter(_.id == x.toString).collect) > } > a.foreach { f => > println(Await.ready(f, Duration.Inf).value.get) > } > rdd.unpersist() > } > } > def main(args: Array[String]): Unit = { >val conf = new SparkConf().setAppName("test") > val sc = new SparkContext(conf) > run(sc) > } > } > {code} > build sbt file: > {code} > name := "RDDTest" > version := "0.1.1" > scalaVersion := "2.11.5" > libraryDependencies ++= Seq ( > "org.scalaz" %% "scalaz-core" % "7.2.0", > "org.scalaz" %% "scalaz-concurrent" % "7.2.0", > "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", > "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" > ) > addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") > mainClass in assembly := Some("RDDApp") > test in assembly := {} > {code} > To reproduce it: > Just > {code} > spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ > --executor-memory 4G \ > --executor-cores 1 \ > --num-executors 1 \ > --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20760) Memory Leak of RDD blocks
[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16012191#comment-16012191 ] Binzi Cao commented on SPARK-20760: --- More information, just did a quick test by removing the Future/Await lines, the issue is still happening. RDD blocks just increased to over 14 thousands after 8 minutes, it is still increasing. Code as below: {code} def run(sc: SparkContext) = { while (true) { val r = scala.util.Random val data = (1 to r.nextInt(100)).toList.map { a => Person(a.toString, a.toString) } val rdd = sc.parallelize(data) rdd.cache println("running") val a = (1 to 1000).toList.map { x => rdd.filter(_.id == x.toString).collect } rdd.unpersist() } } {code} > Memory Leak of RDD blocks > -- > > Key: SPARK-20760 > URL: https://issues.apache.org/jira/browse/SPARK-20760 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 2.1.0 > Environment: Spark 2.1.0 >Reporter: Binzi Cao > Attachments: RDD Blocks .png > > > Memory leak for RDD blocks for a long time running rdd process. > We have a long term running application, which is doing computations of > RDDs. and we found the RDD blocks are keep increasing in the spark ui page. > The rdd blocks and memory usage do not mach the cached rdds and memory. It > looks like spark keeps old rdd in memory and never released it or never got a > chance to release it. The job will eventually die of out of memory. > In addition, I'm not seeing this issue in spark 1.6. We are seeing the same > issue in Yarn Cluster mode both in kafka streaming and batch applications. > The issue in streaming is similar, however, it seems the rdd blocks grows a > bit slower than batch jobs. > The below is the sample code and it is reproducible by justing running it in > local mode. > Scala file: > {code} > import scala.concurrent.duration.Duration > import scala.util.{Try, Failure, Success} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.rdd.RDD > import scala.concurrent._ > import ExecutionContext.Implicits.global > case class Person(id: String, name: String) > object RDDApp { > def run(sc: SparkContext) = { > while (true) { > val r = scala.util.Random > val data = (1 to r.nextInt(100)).toList.map { a => > Person(a.toString, a.toString) > } > val rdd = sc.parallelize(data) > rdd.cache > println("running") > val a = (1 to 100).toList.map { x => > Future(rdd.filter(_.id == x.toString).collect) > } > a.foreach { f => > println(Await.ready(f, Duration.Inf).value.get) > } > rdd.unpersist() > } > } > def main(args: Array[String]): Unit = { >val conf = new SparkConf().setAppName("test") > val sc = new SparkContext(conf) > run(sc) > } > } > {code} > build sbt file: > {code} > name := "RDDTest" > version := "0.1.1" > scalaVersion := "2.11.5" > libraryDependencies ++= Seq ( > "org.scalaz" %% "scalaz-core" % "7.2.0", > "org.scalaz" %% "scalaz-concurrent" % "7.2.0", > "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", > "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" > ) > addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") > mainClass in assembly := Some("RDDApp") > test in assembly := {} > {code} > To reproduce it: > Just > {code} > spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ > --executor-memory 4G \ > --executor-cores 1 \ > --num-executors 1 \ > --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20760) Memory Leak of RDD blocks
[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16012213#comment-16012213 ] Binzi Cao commented on SPARK-20760: --- You might need to wait for a bit more time to reproduce it. This time in my test, the rdd blocks is 64 for about 10 mins, and later it is over 100, after that it will increase quickly. Same code in the previous comments without any multi thread lines > Memory Leak of RDD blocks > -- > > Key: SPARK-20760 > URL: https://issues.apache.org/jira/browse/SPARK-20760 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 2.1.0 > Environment: Spark 2.1.0 >Reporter: Binzi Cao > Attachments: RDD Blocks .png > > > Memory leak for RDD blocks for a long time running rdd process. > We have a long term running application, which is doing computations of > RDDs. and we found the RDD blocks are keep increasing in the spark ui page. > The rdd blocks and memory usage do not mach the cached rdds and memory. It > looks like spark keeps old rdd in memory and never released it or never got a > chance to release it. The job will eventually die of out of memory. > In addition, I'm not seeing this issue in spark 1.6. We are seeing the same > issue in Yarn Cluster mode both in kafka streaming and batch applications. > The issue in streaming is similar, however, it seems the rdd blocks grows a > bit slower than batch jobs. > The below is the sample code and it is reproducible by justing running it in > local mode. > Scala file: > {code} > import scala.concurrent.duration.Duration > import scala.util.{Try, Failure, Success} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.rdd.RDD > import scala.concurrent._ > import ExecutionContext.Implicits.global > case class Person(id: String, name: String) > object RDDApp { > def run(sc: SparkContext) = { > while (true) { > val r = scala.util.Random > val data = (1 to r.nextInt(100)).toList.map { a => > Person(a.toString, a.toString) > } > val rdd = sc.parallelize(data) > rdd.cache > println("running") > val a = (1 to 100).toList.map { x => > Future(rdd.filter(_.id == x.toString).collect) > } > a.foreach { f => > println(Await.ready(f, Duration.Inf).value.get) > } > rdd.unpersist() > } > } > def main(args: Array[String]): Unit = { >val conf = new SparkConf().setAppName("test") > val sc = new SparkContext(conf) > run(sc) > } > } > {code} > build sbt file: > {code} > name := "RDDTest" > version := "0.1.1" > scalaVersion := "2.11.5" > libraryDependencies ++= Seq ( > "org.scalaz" %% "scalaz-core" % "7.2.0", > "org.scalaz" %% "scalaz-concurrent" % "7.2.0", > "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", > "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" > ) > addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") > mainClass in assembly := Some("RDDApp") > test in assembly := {} > {code} > To reproduce it: > Just > {code} > spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ > --executor-memory 4G \ > --executor-cores 1 \ > --num-executors 1 \ > --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20760) Memory Leak of RDD blocks
[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040112#comment-16040112 ] Binzi Cao commented on SPARK-20760: --- Hi David, Thanks very much for the message, I did a test with spark 2.1.1 in local mode. The issue seems still happening, while it seems much better than spark 2.0 as the RDD blocks grows much slower. After running the task for 2 hours, I got around 6000 rdd blocks in memory. I attached the screenshots for the 2.1.1 Binzi > Memory Leak of RDD blocks > -- > > Key: SPARK-20760 > URL: https://issues.apache.org/jira/browse/SPARK-20760 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 2.1.0 > Environment: Spark 2.1.0 >Reporter: Binzi Cao > Attachments: RDD Blocks .png > > > Memory leak for RDD blocks for a long time running rdd process. > We have a long term running application, which is doing computations of > RDDs. and we found the RDD blocks are keep increasing in the spark ui page. > The rdd blocks and memory usage do not mach the cached rdds and memory. It > looks like spark keeps old rdd in memory and never released it or never got a > chance to release it. The job will eventually die of out of memory. > In addition, I'm not seeing this issue in spark 1.6. We are seeing the same > issue in Yarn Cluster mode both in kafka streaming and batch applications. > The issue in streaming is similar, however, it seems the rdd blocks grows a > bit slower than batch jobs. > The below is the sample code and it is reproducible by justing running it in > local mode. > Scala file: > {code} > import scala.concurrent.duration.Duration > import scala.util.{Try, Failure, Success} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.rdd.RDD > import scala.concurrent._ > import ExecutionContext.Implicits.global > case class Person(id: String, name: String) > object RDDApp { > def run(sc: SparkContext) = { > while (true) { > val r = scala.util.Random > val data = (1 to r.nextInt(100)).toList.map { a => > Person(a.toString, a.toString) > } > val rdd = sc.parallelize(data) > rdd.cache > println("running") > val a = (1 to 100).toList.map { x => > Future(rdd.filter(_.id == x.toString).collect) > } > a.foreach { f => > println(Await.ready(f, Duration.Inf).value.get) > } > rdd.unpersist() > } > } > def main(args: Array[String]): Unit = { >val conf = new SparkConf().setAppName("test") > val sc = new SparkContext(conf) > run(sc) > } > } > {code} > build sbt file: > {code} > name := "RDDTest" > version := "0.1.1" > scalaVersion := "2.11.5" > libraryDependencies ++= Seq ( > "org.scalaz" %% "scalaz-core" % "7.2.0", > "org.scalaz" %% "scalaz-concurrent" % "7.2.0", > "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", > "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" > ) > addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") > mainClass in assembly := Some("RDDApp") > test in assembly := {} > {code} > To reproduce it: > Just > {code} > spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ > --executor-memory 4G \ > --executor-cores 1 \ > --num-executors 1 \ > --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks
[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Binzi Cao updated SPARK-20760: -- Attachment: Storage in spark 2.1.1.png RDD blocks in spark 2.1.1.png Spark 2.1.1 Test Result > Memory Leak of RDD blocks > -- > > Key: SPARK-20760 > URL: https://issues.apache.org/jira/browse/SPARK-20760 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 2.1.0 > Environment: Spark 2.1.0 >Reporter: Binzi Cao > Attachments: RDD blocks in spark 2.1.1.png, RDD Blocks .png, Storage > in spark 2.1.1.png > > > Memory leak for RDD blocks for a long time running rdd process. > We have a long term running application, which is doing computations of > RDDs. and we found the RDD blocks are keep increasing in the spark ui page. > The rdd blocks and memory usage do not mach the cached rdds and memory. It > looks like spark keeps old rdd in memory and never released it or never got a > chance to release it. The job will eventually die of out of memory. > In addition, I'm not seeing this issue in spark 1.6. We are seeing the same > issue in Yarn Cluster mode both in kafka streaming and batch applications. > The issue in streaming is similar, however, it seems the rdd blocks grows a > bit slower than batch jobs. > The below is the sample code and it is reproducible by justing running it in > local mode. > Scala file: > {code} > import scala.concurrent.duration.Duration > import scala.util.{Try, Failure, Success} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.rdd.RDD > import scala.concurrent._ > import ExecutionContext.Implicits.global > case class Person(id: String, name: String) > object RDDApp { > def run(sc: SparkContext) = { > while (true) { > val r = scala.util.Random > val data = (1 to r.nextInt(100)).toList.map { a => > Person(a.toString, a.toString) > } > val rdd = sc.parallelize(data) > rdd.cache > println("running") > val a = (1 to 100).toList.map { x => > Future(rdd.filter(_.id == x.toString).collect) > } > a.foreach { f => > println(Await.ready(f, Duration.Inf).value.get) > } > rdd.unpersist() > } > } > def main(args: Array[String]): Unit = { >val conf = new SparkConf().setAppName("test") > val sc = new SparkContext(conf) > run(sc) > } > } > {code} > build sbt file: > {code} > name := "RDDTest" > version := "0.1.1" > scalaVersion := "2.11.5" > libraryDependencies ++= Seq ( > "org.scalaz" %% "scalaz-core" % "7.2.0", > "org.scalaz" %% "scalaz-concurrent" % "7.2.0", > "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", > "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" > ) > addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") > mainClass in assembly := Some("RDDApp") > test in assembly := {} > {code} > To reproduce it: > Just > {code} > spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ > --executor-memory 4G \ > --executor-cores 1 \ > --num-executors 1 \ > --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-20760) Memory Leak of RDD blocks
[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040112#comment-16040112 ] Binzi Cao edited comment on SPARK-20760 at 6/7/17 3:47 AM: --- Hi David, Thanks very much for the message, I did a test with spark 2.1.1 in local mode. The issue seems still happening, while it seems much better than spark 2.1.0 as the RDD blocks grows much slower. After running the task for 2 hours, I got around 6000 rdd blocks in memory. I attached the screenshots for the 2.1.1 Binzi was (Author: caobinzi): Hi David, Thanks very much for the message, I did a test with spark 2.1.1 in local mode. The issue seems still happening, while it seems much better than spark 2.0 as the RDD blocks grows much slower. After running the task for 2 hours, I got around 6000 rdd blocks in memory. I attached the screenshots for the 2.1.1 Binzi > Memory Leak of RDD blocks > -- > > Key: SPARK-20760 > URL: https://issues.apache.org/jira/browse/SPARK-20760 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 2.1.0 > Environment: Spark 2.1.0 >Reporter: Binzi Cao > Attachments: RDD blocks in spark 2.1.1.png, RDD Blocks .png, Storage > in spark 2.1.1.png > > > Memory leak for RDD blocks for a long time running rdd process. > We have a long term running application, which is doing computations of > RDDs. and we found the RDD blocks are keep increasing in the spark ui page. > The rdd blocks and memory usage do not mach the cached rdds and memory. It > looks like spark keeps old rdd in memory and never released it or never got a > chance to release it. The job will eventually die of out of memory. > In addition, I'm not seeing this issue in spark 1.6. We are seeing the same > issue in Yarn Cluster mode both in kafka streaming and batch applications. > The issue in streaming is similar, however, it seems the rdd blocks grows a > bit slower than batch jobs. > The below is the sample code and it is reproducible by justing running it in > local mode. > Scala file: > {code} > import scala.concurrent.duration.Duration > import scala.util.{Try, Failure, Success} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.rdd.RDD > import scala.concurrent._ > import ExecutionContext.Implicits.global > case class Person(id: String, name: String) > object RDDApp { > def run(sc: SparkContext) = { > while (true) { > val r = scala.util.Random > val data = (1 to r.nextInt(100)).toList.map { a => > Person(a.toString, a.toString) > } > val rdd = sc.parallelize(data) > rdd.cache > println("running") > val a = (1 to 100).toList.map { x => > Future(rdd.filter(_.id == x.toString).collect) > } > a.foreach { f => > println(Await.ready(f, Duration.Inf).value.get) > } > rdd.unpersist() > } > } > def main(args: Array[String]): Unit = { >val conf = new SparkConf().setAppName("test") > val sc = new SparkContext(conf) > run(sc) > } > } > {code} > build sbt file: > {code} > name := "RDDTest" > version := "0.1.1" > scalaVersion := "2.11.5" > libraryDependencies ++= Seq ( > "org.scalaz" %% "scalaz-core" % "7.2.0", > "org.scalaz" %% "scalaz-concurrent" % "7.2.0", > "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", > "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" > ) > addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") > mainClass in assembly := Some("RDDApp") > test in assembly := {} > {code} > To reproduce it: > Just > {code} > spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ > --executor-memory 4G \ > --executor-cores 1 \ > --num-executors 1 \ > --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20760) Memory Leak of RDD blocks
[ https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16042009#comment-16042009 ] Binzi Cao commented on SPARK-20760: --- Hi Jose, Thanks very much for your detailed explanation. I got some questions as below 1. If the rdd blocks in ui just reflects how many rdds have been created, why does the number of rdd blocks go up and down during the test? 2. How about the storage page? I can see there is a list of RDDs in that page, does that page also show something wrong? 3. In this case, the code flow is : create rdd, cache rdd, do computations and unpersist , and they are in serial. I don't quite understand how the RDD blocks are created faster than they can be unpersisted. In addition, the issue did not happen in spark 1.6. Regards Binzi > Memory Leak of RDD blocks > -- > > Key: SPARK-20760 > URL: https://issues.apache.org/jira/browse/SPARK-20760 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 2.1.0 > Environment: Spark 2.1.0 >Reporter: Binzi Cao > Attachments: RDD blocks in spark 2.1.1.png, RDD Blocks .png, Storage > in spark 2.1.1.png > > > Memory leak for RDD blocks for a long time running rdd process. > We have a long term running application, which is doing computations of > RDDs. and we found the RDD blocks are keep increasing in the spark ui page. > The rdd blocks and memory usage do not mach the cached rdds and memory. It > looks like spark keeps old rdd in memory and never released it or never got a > chance to release it. The job will eventually die of out of memory. > In addition, I'm not seeing this issue in spark 1.6. We are seeing the same > issue in Yarn Cluster mode both in kafka streaming and batch applications. > The issue in streaming is similar, however, it seems the rdd blocks grows a > bit slower than batch jobs. > The below is the sample code and it is reproducible by justing running it in > local mode. > Scala file: > {code} > import scala.concurrent.duration.Duration > import scala.util.{Try, Failure, Success} > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.rdd.RDD > import scala.concurrent._ > import ExecutionContext.Implicits.global > case class Person(id: String, name: String) > object RDDApp { > def run(sc: SparkContext) = { > while (true) { > val r = scala.util.Random > val data = (1 to r.nextInt(100)).toList.map { a => > Person(a.toString, a.toString) > } > val rdd = sc.parallelize(data) > rdd.cache > println("running") > val a = (1 to 100).toList.map { x => > Future(rdd.filter(_.id == x.toString).collect) > } > a.foreach { f => > println(Await.ready(f, Duration.Inf).value.get) > } > rdd.unpersist() > } > } > def main(args: Array[String]): Unit = { >val conf = new SparkConf().setAppName("test") > val sc = new SparkContext(conf) > run(sc) > } > } > {code} > build sbt file: > {code} > name := "RDDTest" > version := "0.1.1" > scalaVersion := "2.11.5" > libraryDependencies ++= Seq ( > "org.scalaz" %% "scalaz-core" % "7.2.0", > "org.scalaz" %% "scalaz-concurrent" % "7.2.0", > "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided", > "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided" > ) > addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1") > mainClass in assembly := Some("RDDApp") > test in assembly := {} > {code} > To reproduce it: > Just > {code} > spark-2.1.0-bin-hadoop2.7/bin/spark-submit --driver-memory 4G \ > --executor-memory 4G \ > --executor-cores 1 \ > --num-executors 1 \ > --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org