[jira] [Commented] (SPARK-5934) DStreamGraph.clearMetadata attempts to unpersist the same RDD multiple times

2015-11-09 Thread Nick Pritchard (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14997335#comment-14997335
 ] 

Nick Pritchard commented on SPARK-5934:
---

[~jerryshao] thanks for your insight, I'll close this issue.

> DStreamGraph.clearMetadata attempts to unpersist the same RDD multiple times
> 
>
> Key: SPARK-5934
> URL: https://issues.apache.org/jira/browse/SPARK-5934
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Streaming
>Affects Versions: 1.2.1
>Reporter: Nick Pritchard
>Priority: Minor
>
> It seems that since DStream.clearMetadata calls itself recursively on the 
> dependencies, that it attempts to unpersist the same RDD, which results in 
> warn logs like this:
> {quote}
> WARN BlockManager: Asked to remove block rdd_2_1, which does not exist
> {quote}
> or this:
> {quote}
> WARN BlockManager: Block rdd_2_1 could not be removed as it was not found in 
> either the disk, memory, or tachyon store
> {quote}
> This is preceded by logs like:
> {quote}
> DEBUG TransformedDStream: Unpersisting old RDDs: 2
> DEBUG QueueInputDStream: Unpersisting old RDDs: 2
> {quote}
> Here is a reproducible case:
> {code:scala}
> object Test {
>   def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setMaster("local[2]").setAppName("Test")
> val ssc = new StreamingContext(conf, Seconds(1))
> val queue = new mutable.Queue[RDD[Int]]
> val input = ssc.queueStream(queue)
> val output = input.cache().transform(x => x)
> output.print()
> ssc.start()
> for (i <- 1 to 5) {
>   val rdd = ssc.sparkContext.parallelize(Seq(i))
>   queue.enqueue(rdd)
>   Thread.sleep(1000)
> }
> ssc.stop()
>   }
> }
> {code}
> It doesn't seem to be a fatal error, but the WARN messages are a bit 
> unsettling.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5934) DStreamGraph.clearMetadata attempts to unpersist the same RDD multiple times

2015-11-05 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14991353#comment-14991353
 ] 

Saisai Shao commented on SPARK-5934:


I think here in your code:

{code}
val output = input.cache().transform(x => x)
{code}

The transformation you did actually do not generate a new RDD, so for 
QueueInputDStream and TransformedDStream, they maintain the same reference of 
RDD2, in this way, calling {{clearMetadata}} will unpersist this RDD twice.

>From my understanding, this is not a bug, this is your case that lead to this 
>WARNING log, if you changed to {{ input.cache().transform(x => x.map(i => 1)) 
>}}, the warning log will not occur.


> DStreamGraph.clearMetadata attempts to unpersist the same RDD multiple times
> 
>
> Key: SPARK-5934
> URL: https://issues.apache.org/jira/browse/SPARK-5934
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Streaming
>Affects Versions: 1.2.1
>Reporter: Nick Pritchard
>Priority: Minor
>
> It seems that since DStream.clearMetadata calls itself recursively on the 
> dependencies, that it attempts to unpersist the same RDD, which results in 
> warn logs like this:
> {quote}
> WARN BlockManager: Asked to remove block rdd_2_1, which does not exist
> {quote}
> or this:
> {quote}
> WARN BlockManager: Block rdd_2_1 could not be removed as it was not found in 
> either the disk, memory, or tachyon store
> {quote}
> This is preceded by logs like:
> {quote}
> DEBUG TransformedDStream: Unpersisting old RDDs: 2
> DEBUG QueueInputDStream: Unpersisting old RDDs: 2
> {quote}
> Here is a reproducible case:
> {code:scala}
> object Test {
>   def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setMaster("local[2]").setAppName("Test")
> val ssc = new StreamingContext(conf, Seconds(1))
> val queue = new mutable.Queue[RDD[Int]]
> val input = ssc.queueStream(queue)
> val output = input.cache().transform(x => x)
> output.print()
> ssc.start()
> for (i <- 1 to 5) {
>   val rdd = ssc.sparkContext.parallelize(Seq(i))
>   queue.enqueue(rdd)
>   Thread.sleep(1000)
> }
> ssc.stop()
>   }
> }
> {code}
> It doesn't seem to be a fatal error, but the WARN messages are a bit 
> unsettling.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org