[jira] [Commented] (SPARK-24189) Spark Strcutured Streaming not working with the Kafka Transactions

2018-09-04 Thread Binzi Cao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16602889#comment-16602889
 ] 

Binzi Cao commented on SPARK-24189:
---

It seems I'm hitting a similar issuel. I managed to set the kafka isolation 
level with

{code:java}
.option("kafka.isolation.level", "read_committed")
{code}

and using 
{code:java}
kafka-client 1.0.0 
{code}
 and I'm seeing this issue: 


{code:java}
[error] org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due 
to stage failure: Task 17 in stage 0.0 failed 1 times, most recent failure: 
Lost task 17.0 in stage 0.0 (TID 17, localhost, executor driver): 
java.util.concurrent.TimeoutException: Cannot fetch record for offset 145 in 
2000 milliseconds
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:271)
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163)
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147)
[error] at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109)
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147)
[error] at 
org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54)
[error] at 
org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362)
[error] at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:152)
[error] at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:143)
[error] at 
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
[error] at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[error] at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[error] at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:205)
[error] at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
[error] at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
[error] at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[error] at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(generated.java:230)


{code}


So it looks like it is not working with a topic with kafka transactions at all. 

The exception was thrown here:
https://github.com/apache/spark/blob/v2.3.1/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala#L271-L272

Setting
{code:java}
 failOnDataLoss=false
{code}
 can't fix the issue, as the exception is never caught in the 
KafkaDataConsumer.scala code. 





> Spark Strcutured Streaming not working with the Kafka Transactions
> --
>
> Key: SPARK-24189
> URL: https://issues.apache.org/jira/browse/SPARK-24189
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Major
>
> Was trying to read kafka transactional topic using Spark Structured Streaming 
> 2.3.0 with the  kafka option isolation-level = "read_committed", but spark 
> reading the data immediately without waiting for the data in topic to be 
> committed. In spark documentation it was mentioned as Structured Streaming 
> supports Kafka version 0.10 or higher. I am using below command to test the 
> scenario.
> val df = spark
>  .readStream
>  .format("kafka")
>  .option("kafka.bootstrap.servers", "localhost:9092")
>  .option("subscribe", "test-topic")
>  .option("isolation-level","read_committed")
>  .load()
> Can you please let me know if the transactional read is supported in SPark 
> 2.3.0 strcutured Streaming or am i missing anything.
>  
> Thank you.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24189) Spark Strcutured Streaming not working with the Kafka Transactions

2018-09-04 Thread Binzi Cao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16602889#comment-16602889
 ] 

Binzi Cao edited comment on SPARK-24189 at 9/4/18 10:49 AM:


It seems I'm hitting a similar issue. I managed to set the kafka isolation 
level with
{code:java}
.option("kafka.isolation.level", "read_committed")
{code}
and using
{code:java}
kafka-client 1.0.0 
Spark version: 2.3.1{code}
and I'm seeing this issue:
{code:java}
[error] org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due 
to stage failure: Task 17 in stage 0.0 failed 1 times, most recent failure: 
Lost task 17.0 in stage 0.0 (TID 17, localhost, executor driver): 
java.util.concurrent.TimeoutException: Cannot fetch record for offset 145 in 
2000 milliseconds
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:271)
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163)
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147)
[error] at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109)
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147)
[error] at 
org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54)
[error] at 
org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362)
[error] at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:152)
[error] at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:143)
[error] at 
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
[error] at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[error] at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[error] at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:205)
[error] at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
[error] at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
[error] at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[error] at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(generated.java:230)


{code}
So it looks like it is not working with a topic with kafka transactions at all.

The exception was thrown here:
 
[https://github.com/apache/spark/blob/v2.3.1/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala#L271-L272]

Setting
{code:java}
 failOnDataLoss=false
{code}
can't fix the issue, as the exception is never caught in the 
KafkaDataConsumer.scala code.


was (Author: caobinzi):
It seems I'm hitting a similar issuel. I managed to set the kafka isolation 
level with
{code:java}
.option("kafka.isolation.level", "read_committed")
{code}
and using
{code:java}
kafka-client 1.0.0 
Spark version: 2.3.1{code}
and I'm seeing this issue:
{code:java}
[error] org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due 
to stage failure: Task 17 in stage 0.0 failed 1 times, most recent failure: 
Lost task 17.0 in stage 0.0 (TID 17, localhost, executor driver): 
java.util.concurrent.TimeoutException: Cannot fetch record for offset 145 in 
2000 milliseconds
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:271)
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163)
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147)
[error] at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109)
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147)
[error] at 
org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54)
[error] at 
org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362)
[error] a

[jira] [Comment Edited] (SPARK-24189) Spark Strcutured Streaming not working with the Kafka Transactions

2018-09-04 Thread Binzi Cao (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16602889#comment-16602889
 ] 

Binzi Cao edited comment on SPARK-24189 at 9/4/18 10:49 AM:


It seems I'm hitting a similar issuel. I managed to set the kafka isolation 
level with
{code:java}
.option("kafka.isolation.level", "read_committed")
{code}
and using
{code:java}
kafka-client 1.0.0 
Spark version: 2.3.1{code}
and I'm seeing this issue:
{code:java}
[error] org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due 
to stage failure: Task 17 in stage 0.0 failed 1 times, most recent failure: 
Lost task 17.0 in stage 0.0 (TID 17, localhost, executor driver): 
java.util.concurrent.TimeoutException: Cannot fetch record for offset 145 in 
2000 milliseconds
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:271)
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163)
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147)
[error] at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109)
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147)
[error] at 
org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54)
[error] at 
org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362)
[error] at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:152)
[error] at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:143)
[error] at 
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
[error] at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[error] at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[error] at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:205)
[error] at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
[error] at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
[error] at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[error] at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(generated.java:230)


{code}
So it looks like it is not working with a topic with kafka transactions at all.

The exception was thrown here:
 
[https://github.com/apache/spark/blob/v2.3.1/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala#L271-L272]

Setting
{code:java}
 failOnDataLoss=false
{code}
can't fix the issue, as the exception is never caught in the 
KafkaDataConsumer.scala code.


was (Author: caobinzi):
It seems I'm hitting a similar issuel. I managed to set the kafka isolation 
level with

{code:java}
.option("kafka.isolation.level", "read_committed")
{code}

and using 
{code:java}
kafka-client 1.0.0 
{code}
 and I'm seeing this issue: 


{code:java}
[error] org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due 
to stage failure: Task 17 in stage 0.0 failed 1 times, most recent failure: 
Lost task 17.0 in stage 0.0 (TID 17, localhost, executor driver): 
java.util.concurrent.TimeoutException: Cannot fetch record for offset 145 in 
2000 milliseconds
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:271)
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163)
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147)
[error] at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109)
[error] at 
org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147)
[error] at 
org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54)
[error] at 
org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362)
[error] at 
org.apach

[jira] [Created] (SPARK-20760) Memory Leak of RDD blocks

2017-05-15 Thread Binzi Cao (JIRA)
Binzi Cao created SPARK-20760:
-

 Summary: Memory Leak of RDD blocks 
 Key: SPARK-20760
 URL: https://issues.apache.org/jira/browse/SPARK-20760
 Project: Spark
  Issue Type: Bug
  Components: Block Manager
Affects Versions: 2.1.0
 Environment: Spark 2.1.0
Reporter: Binzi Cao
Priority: Critical


Memory lead for RDD blocks for a long time running rdd process. I have a long 
term running application, which is doing caculations of RDDs. and I found the 
RDD blocks are keep increasing. The rdd blocks and memory usage does not mach 
the cached rdds and memory. It looks like spark keeps old rdds in memory and 
never released it. In addtion, I'm not seeing this issue in spark 1.6. 

The below is the minimized code and it is reproducible by justing running it in 
local mode. 
Scala file:
import scala.concurrent.duration.Duration
import scala.util.{Try, Failure, Success}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.concurrent._
import ExecutionContext.Implicits.global
case class Person(id: String, name: String)
object RDDApp {
  def run(sc: SparkContext) = {
while (true) {
  val r = scala.util.Random
  val data = (1 to r.nextInt(100)).toList.map { a =>
Person(a.toString, a.toString)
  }
  val rdd = sc.parallelize(data)
  rdd.cache
  println("running")
  val a = (1 to 100).toList.map { x =>
Future(rdd.filter(_.id == x.toString).collect)
  }
  a.foreach { f =>
println(Await.ready(f, Duration.Inf).value.get)
  }
  rdd.unpersist()
}

  }
  def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setAppName("test")
val sc   = new SparkContext(conf)
run(sc)

  }
}
build sbt file:

name := "RDDTest"
version := "0.1.1"


scalaVersion := "2.11.5"

libraryDependencies ++= Seq (
"org.scalaz" %% "scalaz-core" % "7.2.0",
"org.scalaz" %% "scalaz-concurrent" % "7.2.0",
"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
"org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
  )

addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
mainClass in assembly := Some("RDDApp")
test in assembly := {}

To reproduce it: 

Just 

spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
--executor-memory 4G \
--executor-cores 1 \
--num-executors 1 \
--class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks

2017-05-15 Thread Binzi Cao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Binzi Cao updated SPARK-20760:
--
Description: 
Memory leak for RDD blocks for a long time running rdd process.

I have a long term running application, which is doing computations of RDDs. 
and I found the RDD blocks are keep increasing in the spark ui page. The rdd 
blocks and memory usage does not mach the cached rdds and memory. It looks like 
spark keeps old rdd in memory and never released it or never got a chance to 
release it. The job will eventually die of out of memory. 

In addition, I'm not seeing this issue in spark 1.6. 

The below is the minimized code and it is reproducible by justing running it in 
local mode. 
Scala file:
{{{
import scala.concurrent.duration.Duration
import scala.util.{Try, Failure, Success}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.concurrent._
import ExecutionContext.Implicits.global
case class Person(id: String, name: String)
object RDDApp {
  def run(sc: SparkContext) = {
while (true) {
  val r = scala.util.Random
  val data = (1 to r.nextInt(100)).toList.map { a =>
Person(a.toString, a.toString)
  }
  val rdd = sc.parallelize(data)
  rdd.cache
  println("running")
  val a = (1 to 100).toList.map { x =>
Future(rdd.filter(_.id == x.toString).collect)
  }
  a.foreach { f =>
println(Await.ready(f, Duration.Inf).value.get)
  }
  rdd.unpersist()
}

  }
  def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setAppName("test")
val sc   = new SparkContext(conf)
run(sc)

  }
}

}}}
build sbt file:

name := "RDDTest"
version := "0.1.1"


scalaVersion := "2.11.5"

libraryDependencies ++= Seq (
"org.scalaz" %% "scalaz-core" % "7.2.0",
"org.scalaz" %% "scalaz-concurrent" % "7.2.0",
"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
"org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
  )

addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
mainClass in assembly := Some("RDDApp")
test in assembly := {}

To reproduce it: 

Just 

spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
--executor-memory 4G \
--executor-cores 1 \
--num-executors 1 \
--class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar


  was:
Memory lead for RDD blocks for a long time running rdd process. I have a long 
term running application, which is doing caculations of RDDs. and I found the 
RDD blocks are keep increasing. The rdd blocks and memory usage does not mach 
the cached rdds and memory. It looks like spark keeps old rdds in memory and 
never released it. In addtion, I'm not seeing this issue in spark 1.6. 

The below is the minimized code and it is reproducible by justing running it in 
local mode. 
Scala file:
import scala.concurrent.duration.Duration
import scala.util.{Try, Failure, Success}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.concurrent._
import ExecutionContext.Implicits.global
case class Person(id: String, name: String)
object RDDApp {
  def run(sc: SparkContext) = {
while (true) {
  val r = scala.util.Random
  val data = (1 to r.nextInt(100)).toList.map { a =>
Person(a.toString, a.toString)
  }
  val rdd = sc.parallelize(data)
  rdd.cache
  println("running")
  val a = (1 to 100).toList.map { x =>
Future(rdd.filter(_.id == x.toString).collect)
  }
  a.foreach { f =>
println(Await.ready(f, Duration.Inf).value.get)
  }
  rdd.unpersist()
}

  }
  def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setAppName("test")
val sc   = new SparkContext(conf)
run(sc)

  }
}
build sbt file:

name := "RDDTest"
version := "0.1.1"


scalaVersion := "2.11.5"

libraryDependencies ++= Seq (
"org.scalaz" %% "scalaz-core" % "7.2.0",
"org.scalaz" %% "scalaz-concurrent" % "7.2.0",
"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
"org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
  )

addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
mainClass in assembly := Some("RDDApp")
test in assembly := {}

To reproduce it: 

Just 

spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
--executor-memory 4G \
--executor-cores 1 \
--num-executors 1 \
--class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar



> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.1.0
> Environment: Spark 2.1.0
>Reporter: Binzi Cao
>Priority:

[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks

2017-05-15 Thread Binzi Cao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Binzi Cao updated SPARK-20760:
--
Description: 
Memory leak for RDD blocks for a long time running rdd process.

I have a long term running application, which is doing computations of RDDs. 
and I found the RDD blocks are keep increasing in the spark ui page. The rdd 
blocks and memory usage does not mach the cached rdds and memory. It looks like 
spark keeps old rdd in memory and never released it or never got a chance to 
release it. The job will eventually die of out of memory. 

In addition, I'm not seeing this issue in spark 1.6. 

The below is the minimized code and it is reproducible by justing running it in 
local mode. 
Scala file:
{code}
import scala.concurrent.duration.Duration
import scala.util.{Try, Failure, Success}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.concurrent._
import ExecutionContext.Implicits.global
case class Person(id: String, name: String)
object RDDApp {
  def run(sc: SparkContext) = {
while (true) {
  val r = scala.util.Random
  val data = (1 to r.nextInt(100)).toList.map { a =>
Person(a.toString, a.toString)
  }
  val rdd = sc.parallelize(data)
  rdd.cache
  println("running")
  val a = (1 to 100).toList.map { x =>
Future(rdd.filter(_.id == x.toString).collect)
  }
  a.foreach { f =>
println(Await.ready(f, Duration.Inf).value.get)
  }
  rdd.unpersist()
}

  }
  def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setAppName("test")
val sc   = new SparkContext(conf)
run(sc)

  }
}

{code}
build sbt file:

name := "RDDTest"
version := "0.1.1"


scalaVersion := "2.11.5"

libraryDependencies ++= Seq (
"org.scalaz" %% "scalaz-core" % "7.2.0",
"org.scalaz" %% "scalaz-concurrent" % "7.2.0",
"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
"org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
  )

addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
mainClass in assembly := Some("RDDApp")
test in assembly := {}

To reproduce it: 

Just 

spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
--executor-memory 4G \
--executor-cores 1 \
--num-executors 1 \
--class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar


  was:
Memory leak for RDD blocks for a long time running rdd process.

I have a long term running application, which is doing computations of RDDs. 
and I found the RDD blocks are keep increasing in the spark ui page. The rdd 
blocks and memory usage does not mach the cached rdds and memory. It looks like 
spark keeps old rdd in memory and never released it or never got a chance to 
release it. The job will eventually die of out of memory. 

In addition, I'm not seeing this issue in spark 1.6. 

The below is the minimized code and it is reproducible by justing running it in 
local mode. 
Scala file:
{{{
import scala.concurrent.duration.Duration
import scala.util.{Try, Failure, Success}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.concurrent._
import ExecutionContext.Implicits.global
case class Person(id: String, name: String)
object RDDApp {
  def run(sc: SparkContext) = {
while (true) {
  val r = scala.util.Random
  val data = (1 to r.nextInt(100)).toList.map { a =>
Person(a.toString, a.toString)
  }
  val rdd = sc.parallelize(data)
  rdd.cache
  println("running")
  val a = (1 to 100).toList.map { x =>
Future(rdd.filter(_.id == x.toString).collect)
  }
  a.foreach { f =>
println(Await.ready(f, Duration.Inf).value.get)
  }
  rdd.unpersist()
}

  }
  def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setAppName("test")
val sc   = new SparkContext(conf)
run(sc)

  }
}

}}}
build sbt file:

name := "RDDTest"
version := "0.1.1"


scalaVersion := "2.11.5"

libraryDependencies ++= Seq (
"org.scalaz" %% "scalaz-core" % "7.2.0",
"org.scalaz" %% "scalaz-concurrent" % "7.2.0",
"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
"org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
  )

addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
mainClass in assembly := Some("RDDApp")
test in assembly := {}

To reproduce it: 

Just 

spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
--executor-memory 4G \
--executor-cores 1 \
--num-executors 1 \
--class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar



> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manag

[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks

2017-05-15 Thread Binzi Cao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Binzi Cao updated SPARK-20760:
--
Description: 
Memory leak for RDD blocks for a long time running rdd process.

I have a long term running application, which is doing computations of RDDs. 
and I found the RDD blocks are keep increasing in the spark ui page. The rdd 
blocks and memory usage does not mach the cached rdds and memory. It looks like 
spark keeps old rdd in memory and never released it or never got a chance to 
release it. The job will eventually die of out of memory. 

In addition, I'm not seeing this issue in spark 1.6. 

The below is the minimized code and it is reproducible by justing running it in 
local mode. 
Scala file:
{code}
import scala.concurrent.duration.Duration
import scala.util.{Try, Failure, Success}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.concurrent._
import ExecutionContext.Implicits.global
case class Person(id: String, name: String)
object RDDApp {
  def run(sc: SparkContext) = {
while (true) {
  val r = scala.util.Random
  val data = (1 to r.nextInt(100)).toList.map { a =>
Person(a.toString, a.toString)
  }
  val rdd = sc.parallelize(data)
  rdd.cache
  println("running")
  val a = (1 to 100).toList.map { x =>
Future(rdd.filter(_.id == x.toString).collect)
  }
  a.foreach { f =>
println(Await.ready(f, Duration.Inf).value.get)
  }
  rdd.unpersist()
}

  }
  def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setAppName("test")
val sc   = new SparkContext(conf)
run(sc)

  }
}

{code}
build sbt file:
{code}
name := "RDDTest"
version := "0.1.1"


scalaVersion := "2.11.5"

libraryDependencies ++= Seq (
"org.scalaz" %% "scalaz-core" % "7.2.0",
"org.scalaz" %% "scalaz-concurrent" % "7.2.0",
"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
"org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
  )

addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
mainClass in assembly := Some("RDDApp")
test in assembly := {}
{code}

To reproduce it: 

Just 
{code}

spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
--executor-memory 4G \
--executor-cores 1 \
--num-executors 1 \
--class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
{code}


  was:
Memory leak for RDD blocks for a long time running rdd process.

I have a long term running application, which is doing computations of RDDs. 
and I found the RDD blocks are keep increasing in the spark ui page. The rdd 
blocks and memory usage does not mach the cached rdds and memory. It looks like 
spark keeps old rdd in memory and never released it or never got a chance to 
release it. The job will eventually die of out of memory. 

In addition, I'm not seeing this issue in spark 1.6. 

The below is the minimized code and it is reproducible by justing running it in 
local mode. 
Scala file:
{code}
import scala.concurrent.duration.Duration
import scala.util.{Try, Failure, Success}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.concurrent._
import ExecutionContext.Implicits.global
case class Person(id: String, name: String)
object RDDApp {
  def run(sc: SparkContext) = {
while (true) {
  val r = scala.util.Random
  val data = (1 to r.nextInt(100)).toList.map { a =>
Person(a.toString, a.toString)
  }
  val rdd = sc.parallelize(data)
  rdd.cache
  println("running")
  val a = (1 to 100).toList.map { x =>
Future(rdd.filter(_.id == x.toString).collect)
  }
  a.foreach { f =>
println(Await.ready(f, Duration.Inf).value.get)
  }
  rdd.unpersist()
}

  }
  def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setAppName("test")
val sc   = new SparkContext(conf)
run(sc)

  }
}

{code}
build sbt file:

name := "RDDTest"
version := "0.1.1"


scalaVersion := "2.11.5"

libraryDependencies ++= Seq (
"org.scalaz" %% "scalaz-core" % "7.2.0",
"org.scalaz" %% "scalaz-concurrent" % "7.2.0",
"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
"org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
  )

addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
mainClass in assembly := Some("RDDApp")
test in assembly := {}

To reproduce it: 

Just 

spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
--executor-memory 4G \
--executor-cores 1 \
--num-executors 1 \
--class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar



> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>

[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks

2017-05-15 Thread Binzi Cao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Binzi Cao updated SPARK-20760:
--
Attachment: Screen Shot 2017-05-16 at 1.47.06 pm.png

RDD blocks are growing crazily after running for a couple of hours

> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.1.0
> Environment: Spark 2.1.0
>Reporter: Binzi Cao
>Priority: Critical
> Attachments: Screen Shot 2017-05-16 at 1.47.06 pm.png
>
>
> Memory leak for RDD blocks for a long time running rdd process.
> I have a long term running application, which is doing computations of RDDs. 
> and I found the RDD blocks are keep increasing in the spark ui page. The rdd 
> blocks and memory usage does not mach the cached rdds and memory. It looks 
> like spark keeps old rdd in memory and never released it or never got a 
> chance to release it. The job will eventually die of out of memory. 
> In addition, I'm not seeing this issue in spark 1.6. 
> The below is the minimized code and it is reproducible by justing running it 
> in local mode. 
> Scala file:
> {code}
> import scala.concurrent.duration.Duration
> import scala.util.{Try, Failure, Success}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.rdd.RDD
> import scala.concurrent._
> import ExecutionContext.Implicits.global
> case class Person(id: String, name: String)
> object RDDApp {
>   def run(sc: SparkContext) = {
> while (true) {
>   val r = scala.util.Random
>   val data = (1 to r.nextInt(100)).toList.map { a =>
> Person(a.toString, a.toString)
>   }
>   val rdd = sc.parallelize(data)
>   rdd.cache
>   println("running")
>   val a = (1 to 100).toList.map { x =>
> Future(rdd.filter(_.id == x.toString).collect)
>   }
>   a.foreach { f =>
> println(Await.ready(f, Duration.Inf).value.get)
>   }
>   rdd.unpersist()
> }
>   }
>   def main(args: Array[String]): Unit = {
>val conf = new SparkConf().setAppName("test")
> val sc   = new SparkContext(conf)
> run(sc)
>   }
> }
> {code}
> build sbt file:
> {code}
> name := "RDDTest"
> version := "0.1.1"
> scalaVersion := "2.11.5"
> libraryDependencies ++= Seq (
> "org.scalaz" %% "scalaz-core" % "7.2.0",
> "org.scalaz" %% "scalaz-concurrent" % "7.2.0",
> "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
> "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
>   )
> addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
> mainClass in assembly := Some("RDDApp")
> test in assembly := {}
> {code}
> To reproduce it: 
> Just 
> {code}
> spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
> --executor-memory 4G \
> --executor-cores 1 \
> --num-executors 1 \
> --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks

2017-05-15 Thread Binzi Cao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Binzi Cao updated SPARK-20760:
--
Attachment: (was: Screen Shot 2017-05-16 at 1.47.06 pm.png)

> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.1.0
> Environment: Spark 2.1.0
>Reporter: Binzi Cao
>Priority: Critical
>
> Memory leak for RDD blocks for a long time running rdd process.
> I have a long term running application, which is doing computations of RDDs. 
> and I found the RDD blocks are keep increasing in the spark ui page. The rdd 
> blocks and memory usage does not mach the cached rdds and memory. It looks 
> like spark keeps old rdd in memory and never released it or never got a 
> chance to release it. The job will eventually die of out of memory. 
> In addition, I'm not seeing this issue in spark 1.6. 
> The below is the minimized code and it is reproducible by justing running it 
> in local mode. 
> Scala file:
> {code}
> import scala.concurrent.duration.Duration
> import scala.util.{Try, Failure, Success}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.rdd.RDD
> import scala.concurrent._
> import ExecutionContext.Implicits.global
> case class Person(id: String, name: String)
> object RDDApp {
>   def run(sc: SparkContext) = {
> while (true) {
>   val r = scala.util.Random
>   val data = (1 to r.nextInt(100)).toList.map { a =>
> Person(a.toString, a.toString)
>   }
>   val rdd = sc.parallelize(data)
>   rdd.cache
>   println("running")
>   val a = (1 to 100).toList.map { x =>
> Future(rdd.filter(_.id == x.toString).collect)
>   }
>   a.foreach { f =>
> println(Await.ready(f, Duration.Inf).value.get)
>   }
>   rdd.unpersist()
> }
>   }
>   def main(args: Array[String]): Unit = {
>val conf = new SparkConf().setAppName("test")
> val sc   = new SparkContext(conf)
> run(sc)
>   }
> }
> {code}
> build sbt file:
> {code}
> name := "RDDTest"
> version := "0.1.1"
> scalaVersion := "2.11.5"
> libraryDependencies ++= Seq (
> "org.scalaz" %% "scalaz-core" % "7.2.0",
> "org.scalaz" %% "scalaz-concurrent" % "7.2.0",
> "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
> "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
>   )
> addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
> mainClass in assembly := Some("RDDApp")
> test in assembly := {}
> {code}
> To reproduce it: 
> Just 
> {code}
> spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
> --executor-memory 4G \
> --executor-cores 1 \
> --num-executors 1 \
> --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks

2017-05-15 Thread Binzi Cao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Binzi Cao updated SPARK-20760:
--
Attachment: RDD Blocks .png

RDD blocks are increasing crazily after running the app for a couple of hours, 
see the attached screen shot of the spark ui page

> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.1.0
> Environment: Spark 2.1.0
>Reporter: Binzi Cao
>Priority: Critical
> Attachments: RDD Blocks .png
>
>
> Memory leak for RDD blocks for a long time running rdd process.
> I have a long term running application, which is doing computations of RDDs. 
> and I found the RDD blocks are keep increasing in the spark ui page. The rdd 
> blocks and memory usage does not mach the cached rdds and memory. It looks 
> like spark keeps old rdd in memory and never released it or never got a 
> chance to release it. The job will eventually die of out of memory. 
> In addition, I'm not seeing this issue in spark 1.6. 
> The below is the minimized code and it is reproducible by justing running it 
> in local mode. 
> Scala file:
> {code}
> import scala.concurrent.duration.Duration
> import scala.util.{Try, Failure, Success}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.rdd.RDD
> import scala.concurrent._
> import ExecutionContext.Implicits.global
> case class Person(id: String, name: String)
> object RDDApp {
>   def run(sc: SparkContext) = {
> while (true) {
>   val r = scala.util.Random
>   val data = (1 to r.nextInt(100)).toList.map { a =>
> Person(a.toString, a.toString)
>   }
>   val rdd = sc.parallelize(data)
>   rdd.cache
>   println("running")
>   val a = (1 to 100).toList.map { x =>
> Future(rdd.filter(_.id == x.toString).collect)
>   }
>   a.foreach { f =>
> println(Await.ready(f, Duration.Inf).value.get)
>   }
>   rdd.unpersist()
> }
>   }
>   def main(args: Array[String]): Unit = {
>val conf = new SparkConf().setAppName("test")
> val sc   = new SparkContext(conf)
> run(sc)
>   }
> }
> {code}
> build sbt file:
> {code}
> name := "RDDTest"
> version := "0.1.1"
> scalaVersion := "2.11.5"
> libraryDependencies ++= Seq (
> "org.scalaz" %% "scalaz-core" % "7.2.0",
> "org.scalaz" %% "scalaz-concurrent" % "7.2.0",
> "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
> "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
>   )
> addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
> mainClass in assembly := Some("RDDApp")
> test in assembly := {}
> {code}
> To reproduce it: 
> Just 
> {code}
> spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
> --executor-memory 4G \
> --executor-cores 1 \
> --num-executors 1 \
> --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-20760) Memory Leak of RDD blocks

2017-05-15 Thread Binzi Cao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Binzi Cao updated SPARK-20760:
--
Comment: was deleted

(was: RDD blocks are growing crazily after running for a couple of hours)

> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.1.0
> Environment: Spark 2.1.0
>Reporter: Binzi Cao
>Priority: Critical
> Attachments: RDD Blocks .png
>
>
> Memory leak for RDD blocks for a long time running rdd process.
> I have a long term running application, which is doing computations of RDDs. 
> and I found the RDD blocks are keep increasing in the spark ui page. The rdd 
> blocks and memory usage does not mach the cached rdds and memory. It looks 
> like spark keeps old rdd in memory and never released it or never got a 
> chance to release it. The job will eventually die of out of memory. 
> In addition, I'm not seeing this issue in spark 1.6. 
> The below is the minimized code and it is reproducible by justing running it 
> in local mode. 
> Scala file:
> {code}
> import scala.concurrent.duration.Duration
> import scala.util.{Try, Failure, Success}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.rdd.RDD
> import scala.concurrent._
> import ExecutionContext.Implicits.global
> case class Person(id: String, name: String)
> object RDDApp {
>   def run(sc: SparkContext) = {
> while (true) {
>   val r = scala.util.Random
>   val data = (1 to r.nextInt(100)).toList.map { a =>
> Person(a.toString, a.toString)
>   }
>   val rdd = sc.parallelize(data)
>   rdd.cache
>   println("running")
>   val a = (1 to 100).toList.map { x =>
> Future(rdd.filter(_.id == x.toString).collect)
>   }
>   a.foreach { f =>
> println(Await.ready(f, Duration.Inf).value.get)
>   }
>   rdd.unpersist()
> }
>   }
>   def main(args: Array[String]): Unit = {
>val conf = new SparkConf().setAppName("test")
> val sc   = new SparkContext(conf)
> run(sc)
>   }
> }
> {code}
> build sbt file:
> {code}
> name := "RDDTest"
> version := "0.1.1"
> scalaVersion := "2.11.5"
> libraryDependencies ++= Seq (
> "org.scalaz" %% "scalaz-core" % "7.2.0",
> "org.scalaz" %% "scalaz-concurrent" % "7.2.0",
> "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
> "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
>   )
> addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
> mainClass in assembly := Some("RDDApp")
> test in assembly := {}
> {code}
> To reproduce it: 
> Just 
> {code}
> spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
> --executor-memory 4G \
> --executor-cores 1 \
> --num-executors 1 \
> --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks

2017-05-15 Thread Binzi Cao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Binzi Cao updated SPARK-20760:
--
Description: 
Memory leak for RDD blocks for a long time running rdd process.

We  have a long term running application, which is doing computations of RDDs. 
and we found the RDD blocks are keep increasing in the spark ui page. The rdd 
blocks and memory usage do not mach the cached rdds and memory. It looks like 
spark keeps old rdd in memory and never released it or never got a chance to 
release it. The job will eventually die of out of memory. 

In addition, I'm not seeing this issue in spark 1.6. We are seeing the same 
issue in Yarn Cluster mode both in kafka streaming and batch applications. The 
issue in streaming is similar, however, it seems the rdd blocks grows a bit 
slower than batch jobs. 

The below is the sample code and it is reproducible by justing running it in 
local mode. 
Scala file:
{code}
import scala.concurrent.duration.Duration
import scala.util.{Try, Failure, Success}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.concurrent._
import ExecutionContext.Implicits.global
case class Person(id: String, name: String)
object RDDApp {
  def run(sc: SparkContext) = {
while (true) {
  val r = scala.util.Random
  val data = (1 to r.nextInt(100)).toList.map { a =>
Person(a.toString, a.toString)
  }
  val rdd = sc.parallelize(data)
  rdd.cache
  println("running")
  val a = (1 to 100).toList.map { x =>
Future(rdd.filter(_.id == x.toString).collect)
  }
  a.foreach { f =>
println(Await.ready(f, Duration.Inf).value.get)
  }
  rdd.unpersist()
}

  }
  def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setAppName("test")
val sc   = new SparkContext(conf)
run(sc)

  }
}

{code}
build sbt file:
{code}
name := "RDDTest"
version := "0.1.1"


scalaVersion := "2.11.5"

libraryDependencies ++= Seq (
"org.scalaz" %% "scalaz-core" % "7.2.0",
"org.scalaz" %% "scalaz-concurrent" % "7.2.0",
"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
"org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
  )

addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
mainClass in assembly := Some("RDDApp")
test in assembly := {}
{code}

To reproduce it: 

Just 
{code}

spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
--executor-memory 4G \
--executor-cores 1 \
--num-executors 1 \
--class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
{code}


  was:
Memory leak for RDD blocks for a long time running rdd process.

I have a long term running application, which is doing computations of RDDs. 
and I found the RDD blocks are keep increasing in the spark ui page. The rdd 
blocks and memory usage does not mach the cached rdds and memory. It looks like 
spark keeps old rdd in memory and never released it or never got a chance to 
release it. The job will eventually die of out of memory. 

In addition, I'm not seeing this issue in spark 1.6. 

The below is the minimized code and it is reproducible by justing running it in 
local mode. 
Scala file:
{code}
import scala.concurrent.duration.Duration
import scala.util.{Try, Failure, Success}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.concurrent._
import ExecutionContext.Implicits.global
case class Person(id: String, name: String)
object RDDApp {
  def run(sc: SparkContext) = {
while (true) {
  val r = scala.util.Random
  val data = (1 to r.nextInt(100)).toList.map { a =>
Person(a.toString, a.toString)
  }
  val rdd = sc.parallelize(data)
  rdd.cache
  println("running")
  val a = (1 to 100).toList.map { x =>
Future(rdd.filter(_.id == x.toString).collect)
  }
  a.foreach { f =>
println(Await.ready(f, Duration.Inf).value.get)
  }
  rdd.unpersist()
}

  }
  def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setAppName("test")
val sc   = new SparkContext(conf)
run(sc)

  }
}

{code}
build sbt file:
{code}
name := "RDDTest"
version := "0.1.1"


scalaVersion := "2.11.5"

libraryDependencies ++= Seq (
"org.scalaz" %% "scalaz-core" % "7.2.0",
"org.scalaz" %% "scalaz-concurrent" % "7.2.0",
"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
"org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
  )

addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
mainClass in assembly := Some("RDDApp")
test in assembly := {}
{code}

To reproduce it: 

Just 
{code}

spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
--executor-memory 4G \
--executor-cores 1 \
--num-executors 1 \
--class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
{code

[jira] [Commented] (SPARK-20760) Memory Leak of RDD blocks

2017-05-16 Thread Binzi Cao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16012161#comment-16012161
 ] 

Binzi Cao commented on SPARK-20760:
---

[~srowen] Thank you for the update. 
Some more informations
1. The issue did not happen in spark 1.6.

2. The rdd cache and unpersist are in the same thread, only the computations 
are in other threads. The future/await is just to simulate a production 
application. The unpersist call is  after all the computations done, so there 
are no multiple cache/unpersist running in multiple threads

3. Each cached and unpersist are for different rdds with different ids, there 
are not multithread running to cache or unpersist  same rdd.



> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.1.0
> Environment: Spark 2.1.0
>Reporter: Binzi Cao
> Attachments: RDD Blocks .png
>
>
> Memory leak for RDD blocks for a long time running rdd process.
> We  have a long term running application, which is doing computations of 
> RDDs. and we found the RDD blocks are keep increasing in the spark ui page. 
> The rdd blocks and memory usage do not mach the cached rdds and memory. It 
> looks like spark keeps old rdd in memory and never released it or never got a 
> chance to release it. The job will eventually die of out of memory. 
> In addition, I'm not seeing this issue in spark 1.6. We are seeing the same 
> issue in Yarn Cluster mode both in kafka streaming and batch applications. 
> The issue in streaming is similar, however, it seems the rdd blocks grows a 
> bit slower than batch jobs. 
> The below is the sample code and it is reproducible by justing running it in 
> local mode. 
> Scala file:
> {code}
> import scala.concurrent.duration.Duration
> import scala.util.{Try, Failure, Success}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.rdd.RDD
> import scala.concurrent._
> import ExecutionContext.Implicits.global
> case class Person(id: String, name: String)
> object RDDApp {
>   def run(sc: SparkContext) = {
> while (true) {
>   val r = scala.util.Random
>   val data = (1 to r.nextInt(100)).toList.map { a =>
> Person(a.toString, a.toString)
>   }
>   val rdd = sc.parallelize(data)
>   rdd.cache
>   println("running")
>   val a = (1 to 100).toList.map { x =>
> Future(rdd.filter(_.id == x.toString).collect)
>   }
>   a.foreach { f =>
> println(Await.ready(f, Duration.Inf).value.get)
>   }
>   rdd.unpersist()
> }
>   }
>   def main(args: Array[String]): Unit = {
>val conf = new SparkConf().setAppName("test")
> val sc   = new SparkContext(conf)
> run(sc)
>   }
> }
> {code}
> build sbt file:
> {code}
> name := "RDDTest"
> version := "0.1.1"
> scalaVersion := "2.11.5"
> libraryDependencies ++= Seq (
> "org.scalaz" %% "scalaz-core" % "7.2.0",
> "org.scalaz" %% "scalaz-concurrent" % "7.2.0",
> "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
> "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
>   )
> addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
> mainClass in assembly := Some("RDDApp")
> test in assembly := {}
> {code}
> To reproduce it: 
> Just 
> {code}
> spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
> --executor-memory 4G \
> --executor-cores 1 \
> --num-executors 1 \
> --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-20760) Memory Leak of RDD blocks

2017-05-16 Thread Binzi Cao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16012161#comment-16012161
 ] 

Binzi Cao edited comment on SPARK-20760 at 5/16/17 11:19 AM:
-

[~srowen] Thank you for the update. 
Some more informations
1. The issue did not happen in spark 1.6.

2. The rdd cache and unpersist are in the same thread, only the computations 
are in other threads. The future/await is just to simulate a production 
application. The unpersist call is  after all the computations done. So there 
is a bug in unpersist?

3. Each cached and unpersist are for different rdds with different ids, there 
are not multithread running to cache or unpersist  same rdd.

Is it because each computation will cause an rdd cache seperately and the rdd 
will be cached with multiple copies? That mean rdd is not thread-safe any more ?




was (Author: caobinzi):
[~srowen] Thank you for the update. 
Some more informations
1. The issue did not happen in spark 1.6.

2. The rdd cache and unpersist are in the same thread, only the computations 
are in other threads. The future/await is just to simulate a production 
application. The unpersist call is  after all the computations done, so there 
are no multiple cache/unpersist running in multiple threads

3. Each cached and unpersist are for different rdds with different ids, there 
are not multithread running to cache or unpersist  same rdd.



> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.1.0
> Environment: Spark 2.1.0
>Reporter: Binzi Cao
> Attachments: RDD Blocks .png
>
>
> Memory leak for RDD blocks for a long time running rdd process.
> We  have a long term running application, which is doing computations of 
> RDDs. and we found the RDD blocks are keep increasing in the spark ui page. 
> The rdd blocks and memory usage do not mach the cached rdds and memory. It 
> looks like spark keeps old rdd in memory and never released it or never got a 
> chance to release it. The job will eventually die of out of memory. 
> In addition, I'm not seeing this issue in spark 1.6. We are seeing the same 
> issue in Yarn Cluster mode both in kafka streaming and batch applications. 
> The issue in streaming is similar, however, it seems the rdd blocks grows a 
> bit slower than batch jobs. 
> The below is the sample code and it is reproducible by justing running it in 
> local mode. 
> Scala file:
> {code}
> import scala.concurrent.duration.Duration
> import scala.util.{Try, Failure, Success}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.rdd.RDD
> import scala.concurrent._
> import ExecutionContext.Implicits.global
> case class Person(id: String, name: String)
> object RDDApp {
>   def run(sc: SparkContext) = {
> while (true) {
>   val r = scala.util.Random
>   val data = (1 to r.nextInt(100)).toList.map { a =>
> Person(a.toString, a.toString)
>   }
>   val rdd = sc.parallelize(data)
>   rdd.cache
>   println("running")
>   val a = (1 to 100).toList.map { x =>
> Future(rdd.filter(_.id == x.toString).collect)
>   }
>   a.foreach { f =>
> println(Await.ready(f, Duration.Inf).value.get)
>   }
>   rdd.unpersist()
> }
>   }
>   def main(args: Array[String]): Unit = {
>val conf = new SparkConf().setAppName("test")
> val sc   = new SparkContext(conf)
> run(sc)
>   }
> }
> {code}
> build sbt file:
> {code}
> name := "RDDTest"
> version := "0.1.1"
> scalaVersion := "2.11.5"
> libraryDependencies ++= Seq (
> "org.scalaz" %% "scalaz-core" % "7.2.0",
> "org.scalaz" %% "scalaz-concurrent" % "7.2.0",
> "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
> "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
>   )
> addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
> mainClass in assembly := Some("RDDApp")
> test in assembly := {}
> {code}
> To reproduce it: 
> Just 
> {code}
> spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
> --executor-memory 4G \
> --executor-cores 1 \
> --num-executors 1 \
> --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20760) Memory Leak of RDD blocks

2017-05-16 Thread Binzi Cao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16012176#comment-16012176
 ] 

Binzi Cao commented on SPARK-20760:
---

Ah, I see. I did something like rdd.count to trigger the cache first before the 
computations, however, that did not work. Is there a workaround for this? 

> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.1.0
> Environment: Spark 2.1.0
>Reporter: Binzi Cao
> Attachments: RDD Blocks .png
>
>
> Memory leak for RDD blocks for a long time running rdd process.
> We  have a long term running application, which is doing computations of 
> RDDs. and we found the RDD blocks are keep increasing in the spark ui page. 
> The rdd blocks and memory usage do not mach the cached rdds and memory. It 
> looks like spark keeps old rdd in memory and never released it or never got a 
> chance to release it. The job will eventually die of out of memory. 
> In addition, I'm not seeing this issue in spark 1.6. We are seeing the same 
> issue in Yarn Cluster mode both in kafka streaming and batch applications. 
> The issue in streaming is similar, however, it seems the rdd blocks grows a 
> bit slower than batch jobs. 
> The below is the sample code and it is reproducible by justing running it in 
> local mode. 
> Scala file:
> {code}
> import scala.concurrent.duration.Duration
> import scala.util.{Try, Failure, Success}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.rdd.RDD
> import scala.concurrent._
> import ExecutionContext.Implicits.global
> case class Person(id: String, name: String)
> object RDDApp {
>   def run(sc: SparkContext) = {
> while (true) {
>   val r = scala.util.Random
>   val data = (1 to r.nextInt(100)).toList.map { a =>
> Person(a.toString, a.toString)
>   }
>   val rdd = sc.parallelize(data)
>   rdd.cache
>   println("running")
>   val a = (1 to 100).toList.map { x =>
> Future(rdd.filter(_.id == x.toString).collect)
>   }
>   a.foreach { f =>
> println(Await.ready(f, Duration.Inf).value.get)
>   }
>   rdd.unpersist()
> }
>   }
>   def main(args: Array[String]): Unit = {
>val conf = new SparkConf().setAppName("test")
> val sc   = new SparkContext(conf)
> run(sc)
>   }
> }
> {code}
> build sbt file:
> {code}
> name := "RDDTest"
> version := "0.1.1"
> scalaVersion := "2.11.5"
> libraryDependencies ++= Seq (
> "org.scalaz" %% "scalaz-core" % "7.2.0",
> "org.scalaz" %% "scalaz-concurrent" % "7.2.0",
> "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
> "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
>   )
> addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
> mainClass in assembly := Some("RDDApp")
> test in assembly := {}
> {code}
> To reproduce it: 
> Just 
> {code}
> spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
> --executor-memory 4G \
> --executor-cores 1 \
> --num-executors 1 \
> --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20760) Memory Leak of RDD blocks

2017-05-16 Thread Binzi Cao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16012191#comment-16012191
 ] 

Binzi Cao commented on SPARK-20760:
---

More information, just did a quick test by removing the Future/Await lines, the 
issue is still happening. 

RDD blocks just increased to over 14 thousands after 8 minutes, it is still 
increasing. 

Code as below:

{code}
   def run(sc: SparkContext) = {
  while (true) {
val r = scala.util.Random
val data = (1 to r.nextInt(100)).toList.map { a =>
  Person(a.toString, a.toString)
}
val rdd = sc.parallelize(data)
rdd.cache
println("running")
   val a = (1 to 1000).toList.map { x =>
 
 rdd.filter(_.id == x.toString).collect
   
}

rdd.unpersist()
  }

}
{code}

> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.1.0
> Environment: Spark 2.1.0
>Reporter: Binzi Cao
> Attachments: RDD Blocks .png
>
>
> Memory leak for RDD blocks for a long time running rdd process.
> We  have a long term running application, which is doing computations of 
> RDDs. and we found the RDD blocks are keep increasing in the spark ui page. 
> The rdd blocks and memory usage do not mach the cached rdds and memory. It 
> looks like spark keeps old rdd in memory and never released it or never got a 
> chance to release it. The job will eventually die of out of memory. 
> In addition, I'm not seeing this issue in spark 1.6. We are seeing the same 
> issue in Yarn Cluster mode both in kafka streaming and batch applications. 
> The issue in streaming is similar, however, it seems the rdd blocks grows a 
> bit slower than batch jobs. 
> The below is the sample code and it is reproducible by justing running it in 
> local mode. 
> Scala file:
> {code}
> import scala.concurrent.duration.Duration
> import scala.util.{Try, Failure, Success}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.rdd.RDD
> import scala.concurrent._
> import ExecutionContext.Implicits.global
> case class Person(id: String, name: String)
> object RDDApp {
>   def run(sc: SparkContext) = {
> while (true) {
>   val r = scala.util.Random
>   val data = (1 to r.nextInt(100)).toList.map { a =>
> Person(a.toString, a.toString)
>   }
>   val rdd = sc.parallelize(data)
>   rdd.cache
>   println("running")
>   val a = (1 to 100).toList.map { x =>
> Future(rdd.filter(_.id == x.toString).collect)
>   }
>   a.foreach { f =>
> println(Await.ready(f, Duration.Inf).value.get)
>   }
>   rdd.unpersist()
> }
>   }
>   def main(args: Array[String]): Unit = {
>val conf = new SparkConf().setAppName("test")
> val sc   = new SparkContext(conf)
> run(sc)
>   }
> }
> {code}
> build sbt file:
> {code}
> name := "RDDTest"
> version := "0.1.1"
> scalaVersion := "2.11.5"
> libraryDependencies ++= Seq (
> "org.scalaz" %% "scalaz-core" % "7.2.0",
> "org.scalaz" %% "scalaz-concurrent" % "7.2.0",
> "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
> "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
>   )
> addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
> mainClass in assembly := Some("RDDApp")
> test in assembly := {}
> {code}
> To reproduce it: 
> Just 
> {code}
> spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
> --executor-memory 4G \
> --executor-cores 1 \
> --num-executors 1 \
> --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20760) Memory Leak of RDD blocks

2017-05-16 Thread Binzi Cao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16012213#comment-16012213
 ] 

Binzi Cao commented on SPARK-20760:
---

You might need to wait for a bit more time to reproduce it. This time in my 
test, the rdd blocks is 64 for about 10 mins, and later it is over 100, after 
that it will increase quickly. Same code in the previous comments without any 
multi thread lines

> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.1.0
> Environment: Spark 2.1.0
>Reporter: Binzi Cao
> Attachments: RDD Blocks .png
>
>
> Memory leak for RDD blocks for a long time running rdd process.
> We  have a long term running application, which is doing computations of 
> RDDs. and we found the RDD blocks are keep increasing in the spark ui page. 
> The rdd blocks and memory usage do not mach the cached rdds and memory. It 
> looks like spark keeps old rdd in memory and never released it or never got a 
> chance to release it. The job will eventually die of out of memory. 
> In addition, I'm not seeing this issue in spark 1.6. We are seeing the same 
> issue in Yarn Cluster mode both in kafka streaming and batch applications. 
> The issue in streaming is similar, however, it seems the rdd blocks grows a 
> bit slower than batch jobs. 
> The below is the sample code and it is reproducible by justing running it in 
> local mode. 
> Scala file:
> {code}
> import scala.concurrent.duration.Duration
> import scala.util.{Try, Failure, Success}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.rdd.RDD
> import scala.concurrent._
> import ExecutionContext.Implicits.global
> case class Person(id: String, name: String)
> object RDDApp {
>   def run(sc: SparkContext) = {
> while (true) {
>   val r = scala.util.Random
>   val data = (1 to r.nextInt(100)).toList.map { a =>
> Person(a.toString, a.toString)
>   }
>   val rdd = sc.parallelize(data)
>   rdd.cache
>   println("running")
>   val a = (1 to 100).toList.map { x =>
> Future(rdd.filter(_.id == x.toString).collect)
>   }
>   a.foreach { f =>
> println(Await.ready(f, Duration.Inf).value.get)
>   }
>   rdd.unpersist()
> }
>   }
>   def main(args: Array[String]): Unit = {
>val conf = new SparkConf().setAppName("test")
> val sc   = new SparkContext(conf)
> run(sc)
>   }
> }
> {code}
> build sbt file:
> {code}
> name := "RDDTest"
> version := "0.1.1"
> scalaVersion := "2.11.5"
> libraryDependencies ++= Seq (
> "org.scalaz" %% "scalaz-core" % "7.2.0",
> "org.scalaz" %% "scalaz-concurrent" % "7.2.0",
> "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
> "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
>   )
> addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
> mainClass in assembly := Some("RDDApp")
> test in assembly := {}
> {code}
> To reproduce it: 
> Just 
> {code}
> spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
> --executor-memory 4G \
> --executor-cores 1 \
> --num-executors 1 \
> --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20760) Memory Leak of RDD blocks

2017-06-06 Thread Binzi Cao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040112#comment-16040112
 ] 

Binzi Cao commented on SPARK-20760:
---

Hi David, 

Thanks very much for the message, I did a test with spark 2.1.1 in local mode. 
The issue seems still happening, while it seems much better than spark 2.0 as 
the RDD blocks grows much slower. After running the task for 2 hours, I got 
around 6000 rdd blocks in memory. 

I attached the screenshots for the 2.1.1

Binzi

> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.1.0
> Environment: Spark 2.1.0
>Reporter: Binzi Cao
> Attachments: RDD Blocks .png
>
>
> Memory leak for RDD blocks for a long time running rdd process.
> We  have a long term running application, which is doing computations of 
> RDDs. and we found the RDD blocks are keep increasing in the spark ui page. 
> The rdd blocks and memory usage do not mach the cached rdds and memory. It 
> looks like spark keeps old rdd in memory and never released it or never got a 
> chance to release it. The job will eventually die of out of memory. 
> In addition, I'm not seeing this issue in spark 1.6. We are seeing the same 
> issue in Yarn Cluster mode both in kafka streaming and batch applications. 
> The issue in streaming is similar, however, it seems the rdd blocks grows a 
> bit slower than batch jobs. 
> The below is the sample code and it is reproducible by justing running it in 
> local mode. 
> Scala file:
> {code}
> import scala.concurrent.duration.Duration
> import scala.util.{Try, Failure, Success}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.rdd.RDD
> import scala.concurrent._
> import ExecutionContext.Implicits.global
> case class Person(id: String, name: String)
> object RDDApp {
>   def run(sc: SparkContext) = {
> while (true) {
>   val r = scala.util.Random
>   val data = (1 to r.nextInt(100)).toList.map { a =>
> Person(a.toString, a.toString)
>   }
>   val rdd = sc.parallelize(data)
>   rdd.cache
>   println("running")
>   val a = (1 to 100).toList.map { x =>
> Future(rdd.filter(_.id == x.toString).collect)
>   }
>   a.foreach { f =>
> println(Await.ready(f, Duration.Inf).value.get)
>   }
>   rdd.unpersist()
> }
>   }
>   def main(args: Array[String]): Unit = {
>val conf = new SparkConf().setAppName("test")
> val sc   = new SparkContext(conf)
> run(sc)
>   }
> }
> {code}
> build sbt file:
> {code}
> name := "RDDTest"
> version := "0.1.1"
> scalaVersion := "2.11.5"
> libraryDependencies ++= Seq (
> "org.scalaz" %% "scalaz-core" % "7.2.0",
> "org.scalaz" %% "scalaz-concurrent" % "7.2.0",
> "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
> "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
>   )
> addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
> mainClass in assembly := Some("RDDApp")
> test in assembly := {}
> {code}
> To reproduce it: 
> Just 
> {code}
> spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
> --executor-memory 4G \
> --executor-cores 1 \
> --num-executors 1 \
> --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20760) Memory Leak of RDD blocks

2017-06-06 Thread Binzi Cao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Binzi Cao updated SPARK-20760:
--
Attachment: Storage in spark 2.1.1.png
RDD blocks in spark 2.1.1.png

Spark 2.1.1 Test Result

> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.1.0
> Environment: Spark 2.1.0
>Reporter: Binzi Cao
> Attachments: RDD blocks in spark 2.1.1.png, RDD Blocks .png, Storage 
> in spark 2.1.1.png
>
>
> Memory leak for RDD blocks for a long time running rdd process.
> We  have a long term running application, which is doing computations of 
> RDDs. and we found the RDD blocks are keep increasing in the spark ui page. 
> The rdd blocks and memory usage do not mach the cached rdds and memory. It 
> looks like spark keeps old rdd in memory and never released it or never got a 
> chance to release it. The job will eventually die of out of memory. 
> In addition, I'm not seeing this issue in spark 1.6. We are seeing the same 
> issue in Yarn Cluster mode both in kafka streaming and batch applications. 
> The issue in streaming is similar, however, it seems the rdd blocks grows a 
> bit slower than batch jobs. 
> The below is the sample code and it is reproducible by justing running it in 
> local mode. 
> Scala file:
> {code}
> import scala.concurrent.duration.Duration
> import scala.util.{Try, Failure, Success}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.rdd.RDD
> import scala.concurrent._
> import ExecutionContext.Implicits.global
> case class Person(id: String, name: String)
> object RDDApp {
>   def run(sc: SparkContext) = {
> while (true) {
>   val r = scala.util.Random
>   val data = (1 to r.nextInt(100)).toList.map { a =>
> Person(a.toString, a.toString)
>   }
>   val rdd = sc.parallelize(data)
>   rdd.cache
>   println("running")
>   val a = (1 to 100).toList.map { x =>
> Future(rdd.filter(_.id == x.toString).collect)
>   }
>   a.foreach { f =>
> println(Await.ready(f, Duration.Inf).value.get)
>   }
>   rdd.unpersist()
> }
>   }
>   def main(args: Array[String]): Unit = {
>val conf = new SparkConf().setAppName("test")
> val sc   = new SparkContext(conf)
> run(sc)
>   }
> }
> {code}
> build sbt file:
> {code}
> name := "RDDTest"
> version := "0.1.1"
> scalaVersion := "2.11.5"
> libraryDependencies ++= Seq (
> "org.scalaz" %% "scalaz-core" % "7.2.0",
> "org.scalaz" %% "scalaz-concurrent" % "7.2.0",
> "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
> "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
>   )
> addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
> mainClass in assembly := Some("RDDApp")
> test in assembly := {}
> {code}
> To reproduce it: 
> Just 
> {code}
> spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
> --executor-memory 4G \
> --executor-cores 1 \
> --num-executors 1 \
> --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-20760) Memory Leak of RDD blocks

2017-06-06 Thread Binzi Cao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040112#comment-16040112
 ] 

Binzi Cao edited comment on SPARK-20760 at 6/7/17 3:47 AM:
---

Hi David, 

Thanks very much for the message, I did a test with spark 2.1.1 in local mode. 
The issue seems still happening, while it seems much better than spark 2.1.0 as 
the RDD blocks grows much slower. After running the task for 2 hours, I got 
around 6000 rdd blocks in memory. 

I attached the screenshots for the 2.1.1

Binzi


was (Author: caobinzi):
Hi David, 

Thanks very much for the message, I did a test with spark 2.1.1 in local mode. 
The issue seems still happening, while it seems much better than spark 2.0 as 
the RDD blocks grows much slower. After running the task for 2 hours, I got 
around 6000 rdd blocks in memory. 

I attached the screenshots for the 2.1.1

Binzi

> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.1.0
> Environment: Spark 2.1.0
>Reporter: Binzi Cao
> Attachments: RDD blocks in spark 2.1.1.png, RDD Blocks .png, Storage 
> in spark 2.1.1.png
>
>
> Memory leak for RDD blocks for a long time running rdd process.
> We  have a long term running application, which is doing computations of 
> RDDs. and we found the RDD blocks are keep increasing in the spark ui page. 
> The rdd blocks and memory usage do not mach the cached rdds and memory. It 
> looks like spark keeps old rdd in memory and never released it or never got a 
> chance to release it. The job will eventually die of out of memory. 
> In addition, I'm not seeing this issue in spark 1.6. We are seeing the same 
> issue in Yarn Cluster mode both in kafka streaming and batch applications. 
> The issue in streaming is similar, however, it seems the rdd blocks grows a 
> bit slower than batch jobs. 
> The below is the sample code and it is reproducible by justing running it in 
> local mode. 
> Scala file:
> {code}
> import scala.concurrent.duration.Duration
> import scala.util.{Try, Failure, Success}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.rdd.RDD
> import scala.concurrent._
> import ExecutionContext.Implicits.global
> case class Person(id: String, name: String)
> object RDDApp {
>   def run(sc: SparkContext) = {
> while (true) {
>   val r = scala.util.Random
>   val data = (1 to r.nextInt(100)).toList.map { a =>
> Person(a.toString, a.toString)
>   }
>   val rdd = sc.parallelize(data)
>   rdd.cache
>   println("running")
>   val a = (1 to 100).toList.map { x =>
> Future(rdd.filter(_.id == x.toString).collect)
>   }
>   a.foreach { f =>
> println(Await.ready(f, Duration.Inf).value.get)
>   }
>   rdd.unpersist()
> }
>   }
>   def main(args: Array[String]): Unit = {
>val conf = new SparkConf().setAppName("test")
> val sc   = new SparkContext(conf)
> run(sc)
>   }
> }
> {code}
> build sbt file:
> {code}
> name := "RDDTest"
> version := "0.1.1"
> scalaVersion := "2.11.5"
> libraryDependencies ++= Seq (
> "org.scalaz" %% "scalaz-core" % "7.2.0",
> "org.scalaz" %% "scalaz-concurrent" % "7.2.0",
> "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
> "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
>   )
> addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
> mainClass in assembly := Some("RDDApp")
> test in assembly := {}
> {code}
> To reproduce it: 
> Just 
> {code}
> spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
> --executor-memory 4G \
> --executor-cores 1 \
> --num-executors 1 \
> --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20760) Memory Leak of RDD blocks

2017-06-07 Thread Binzi Cao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16042009#comment-16042009
 ] 

Binzi Cao commented on SPARK-20760:
---

Hi Jose, 

Thanks very much for your detailed explanation.  I got some questions as below

1. If the rdd blocks in ui just reflects how many rdds have been created, why  
does the number of rdd blocks go up and down during the test?  

2. How about the storage page? I can see there is a list of RDDs in that page, 
does that page also show something wrong? 

3. In this case, the code flow is : create rdd, cache rdd, do computations and 
unpersist , and they are in serial. I don't quite understand how the RDD blocks 
are created faster than they can be unpersisted.
 
In addition, the issue did not happen in spark 1.6. 

Regards

Binzi

> Memory Leak of RDD blocks 
> --
>
> Key: SPARK-20760
> URL: https://issues.apache.org/jira/browse/SPARK-20760
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.1.0
> Environment: Spark 2.1.0
>Reporter: Binzi Cao
> Attachments: RDD blocks in spark 2.1.1.png, RDD Blocks .png, Storage 
> in spark 2.1.1.png
>
>
> Memory leak for RDD blocks for a long time running rdd process.
> We  have a long term running application, which is doing computations of 
> RDDs. and we found the RDD blocks are keep increasing in the spark ui page. 
> The rdd blocks and memory usage do not mach the cached rdds and memory. It 
> looks like spark keeps old rdd in memory and never released it or never got a 
> chance to release it. The job will eventually die of out of memory. 
> In addition, I'm not seeing this issue in spark 1.6. We are seeing the same 
> issue in Yarn Cluster mode both in kafka streaming and batch applications. 
> The issue in streaming is similar, however, it seems the rdd blocks grows a 
> bit slower than batch jobs. 
> The below is the sample code and it is reproducible by justing running it in 
> local mode. 
> Scala file:
> {code}
> import scala.concurrent.duration.Duration
> import scala.util.{Try, Failure, Success}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.rdd.RDD
> import scala.concurrent._
> import ExecutionContext.Implicits.global
> case class Person(id: String, name: String)
> object RDDApp {
>   def run(sc: SparkContext) = {
> while (true) {
>   val r = scala.util.Random
>   val data = (1 to r.nextInt(100)).toList.map { a =>
> Person(a.toString, a.toString)
>   }
>   val rdd = sc.parallelize(data)
>   rdd.cache
>   println("running")
>   val a = (1 to 100).toList.map { x =>
> Future(rdd.filter(_.id == x.toString).collect)
>   }
>   a.foreach { f =>
> println(Await.ready(f, Duration.Inf).value.get)
>   }
>   rdd.unpersist()
> }
>   }
>   def main(args: Array[String]): Unit = {
>val conf = new SparkConf().setAppName("test")
> val sc   = new SparkContext(conf)
> run(sc)
>   }
> }
> {code}
> build sbt file:
> {code}
> name := "RDDTest"
> version := "0.1.1"
> scalaVersion := "2.11.5"
> libraryDependencies ++= Seq (
> "org.scalaz" %% "scalaz-core" % "7.2.0",
> "org.scalaz" %% "scalaz-concurrent" % "7.2.0",
> "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided",
> "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % "provided"
>   )
> addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.7.1")
> mainClass in assembly := Some("RDDApp")
> test in assembly := {}
> {code}
> To reproduce it: 
> Just 
> {code}
> spark-2.1.0-bin-hadoop2.7/bin/spark-submit   --driver-memory 4G \
> --executor-memory 4G \
> --executor-cores 1 \
> --num-executors 1 \
> --class "RDDApp" --master local[4] RDDTest-assembly-0.1.1.jar
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org