[jira] [Commented] (SPARK-26534) Closure Cleaner Bug

2019-01-07 Thread sam (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16735699#comment-16735699
 ] 

sam commented on SPARK-26534:
-

[~viirya]

If I change to RDD I cannot reproduce either.  This is further evidence that 
this is certainly a bug, since serialisation of closures ought to be 
independent whether we use RDD or Dataset.

I have pasted the full output of my sbt console session to show the error for 
Dataset.

> Closure Cleaner Bug
> ---
>
> Key: SPARK-26534
> URL: https://issues.apache.org/jira/browse/SPARK-26534
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: sam
>Priority: Major
>
> I've found a strange combination of closures where the closure cleaner 
> doesn't seem to be smart enough to figure out how to remove a reference that 
> is not used. I.e. we get a `org.apache.spark.SparkException: Task not 
> serializable` for a Task that is perfectly serializable.  
>  
> In the example below, the only `val` that is actually needed for the closure 
> of the `map` is `foo`, but it tries to serialise `thingy`.  What is odd is 
> changing this code in a number of subtle ways eliminates the error, which 
> I've tried to highlight using comments inline.
>  
> {code:java}
> import org.apache.spark.sql._
> object Test {
>   val sparkSession: SparkSession =
> SparkSession.builder.master("local").appName("app").getOrCreate()
>   def apply(): Unit = {
> import sparkSession.implicits._
> val landedData: Dataset[String] = 
> sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS()
> // thingy has to be in this outer scope to reproduce, if in someFunc, 
> cannot reproduce
> val thingy: Thingy = new Thingy
> // If not wrapped in someFunc cannot reproduce
> val someFunc = () => {
>   // If don't reference this foo inside the closer (e.g. just use 
> identity function) cannot reproduce
>   val foo: String = "foo"
>   thingy.run(block = () => {
> landedData.map(r => {
>   r + foo
> })
> .count()
>   })
> }
> someFunc()
>   }
> }
> class Thingy {
>   def run[R](block: () => R): R = {
> block()
>   }
> }
> {code}
> The full trace if ran in `sbt console`
> {code}
> scala> class Thingy {
>  |   def run[R](block: () => R): R = {
>  | block()
>  |   }
>  | }
> defined class Thingy
> scala> 
> scala> object Test {
>  |   val sparkSession: SparkSession =
>  | SparkSession.builder.master("local").appName("app").getOrCreate()
>  | 
>  |   def apply(): Unit = {
>  | import sparkSession.implicits._
>  | 
>  | val landedData: Dataset[String] = 
> sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS()
>  | 
>  | // thingy has to be in this outer scope to reproduce, if in 
> someFunc, cannot reproduce
>  | val thingy: Thingy = new Thingy
>  | 
>  | // If not wrapped in someFunc cannot reproduce
>  | val someFunc = () => {
>  |   // If don't reference this foo inside the closer (e.g. just use 
> identity function) cannot reproduce
>  |   val foo: String = "foo"
>  | 
>  |   thingy.run(block = () => {
>  | landedData.map(r => {
>  |   r + foo
>  | })
>  | .count()
>  |   })
>  | }
>  | 
>  | someFunc()
>  | 
>  |   }
>  | }
> defined object Test
> scala> 
> scala> 
> scala> Test.apply()
> Using Spark's default log4j profile: 
> org/apache/spark/log4j-defaults.properties
> 19/01/07 11:27:19 INFO SparkContext: Running Spark version 2.3.1
> 19/01/07 11:27:20 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> 19/01/07 11:27:20 INFO SparkContext: Submitted application: app
> 19/01/07 11:27:20 INFO SecurityManager: Changing view acls to: sams
> 19/01/07 11:27:20 INFO SecurityManager: Changing modify acls to: sams
> 19/01/07 11:27:20 INFO SecurityManager: Changing view acls groups to: 
> 19/01/07 11:27:20 INFO SecurityManager: Changing modify acls groups to: 
> 19/01/07 11:27:20 INFO SecurityManager: SecurityManager: authentication 
> disabled; ui acls disabled; users  with view permissions: Set(sams); groups 
> with view permissions: Set(); users  with modify permissions: Set(sams); 
> groups with modify permissions: Set()
> 19/01/07 11:27:20 INFO Utils: Successfully started service 'sparkDriver' on 
> port 54066.
> 19/01/07 11:27:20 INFO SparkEnv: Registering MapOutputTracker
> 19/01/07 11:27:20 INFO SparkEnv: Registering BlockManagerMaster
> 19/01/07 11:27:20 INFO BlockManagerMasterEndpoint: Using 
> org.apache.spark.storage.DefaultTopologyMapper for getting topology 
> information
> 19/01/07 11:27:20 INFO 

[jira] [Commented] (SPARK-26534) Closure Cleaner Bug

2019-01-06 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16735214#comment-16735214
 ] 

Liang-Chi Hsieh commented on SPARK-26534:
-

I think the only difference is using Dataset or RDD. Can you replace with RDD 
and see if the issue is still?

> Closure Cleaner Bug
> ---
>
> Key: SPARK-26534
> URL: https://issues.apache.org/jira/browse/SPARK-26534
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: sam
>Priority: Major
>
> I've found a strange combination of closures where the closure cleaner 
> doesn't seem to be smart enough to figure out how to remove a reference that 
> is not used. I.e. we get a `org.apache.spark.SparkException: Task not 
> serializable` for a Task that is perfectly serializable.  
>  
> In the example below, the only `val` that is actually needed for the closure 
> of the `map` is `foo`, but it tries to serialise `thingy`.  What is odd is 
> changing this code in a number of subtle ways eliminates the error, which 
> I've tried to highlight using comments inline.
>  
> {code:java}
> import org.apache.spark.sql._
> object Test {
>   val sparkSession: SparkSession =
> SparkSession.builder.master("local").appName("app").getOrCreate()
>   def apply(): Unit = {
> import sparkSession.implicits._
> val landedData: Dataset[String] = 
> sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS()
> // thingy has to be in this outer scope to reproduce, if in someFunc, 
> cannot reproduce
> val thingy: Thingy = new Thingy
> // If not wrapped in someFunc cannot reproduce
> val someFunc = () => {
>   // If don't reference this foo inside the closer (e.g. just use 
> identity function) cannot reproduce
>   val foo: String = "foo"
>   thingy.run(block = () => {
> landedData.map(r => {
>   r + foo
> })
> .count()
>   })
> }
> someFunc()
>   }
> }
> class Thingy {
>   def run[R](block: () => R): R = {
> block()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26534) Closure Cleaner Bug

2019-01-06 Thread sam (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16735211#comment-16735211
 ] 

sam commented on SPARK-26534:
-

[~viirya] Your version is slightly different, can you reproduce using exactly 
my code?

> Closure Cleaner Bug
> ---
>
> Key: SPARK-26534
> URL: https://issues.apache.org/jira/browse/SPARK-26534
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: sam
>Priority: Major
>
> I've found a strange combination of closures where the closure cleaner 
> doesn't seem to be smart enough to figure out how to remove a reference that 
> is not used. I.e. we get a `org.apache.spark.SparkException: Task not 
> serializable` for a Task that is perfectly serializable.  
>  
> In the example below, the only `val` that is actually needed for the closure 
> of the `map` is `foo`, but it tries to serialise `thingy`.  What is odd is 
> changing this code in a number of subtle ways eliminates the error, which 
> I've tried to highlight using comments inline.
>  
> {code:java}
> import org.apache.spark.sql._
> object Test {
>   val sparkSession: SparkSession =
> SparkSession.builder.master("local").appName("app").getOrCreate()
>   def apply(): Unit = {
> import sparkSession.implicits._
> val landedData: Dataset[String] = 
> sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS()
> // thingy has to be in this outer scope to reproduce, if in someFunc, 
> cannot reproduce
> val thingy: Thingy = new Thingy
> // If not wrapped in someFunc cannot reproduce
> val someFunc = () => {
>   // If don't reference this foo inside the closer (e.g. just use 
> identity function) cannot reproduce
>   val foo: String = "foo"
>   thingy.run(block = () => {
> landedData.map(r => {
>   r + foo
> })
> .count()
>   })
> }
> someFunc()
>   }
> }
> class Thingy {
>   def run[R](block: () => R): R = {
> block()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26534) Closure Cleaner Bug

2019-01-06 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16735130#comment-16735130
 ] 

Liang-Chi Hsieh commented on SPARK-26534:
-

I do a test as below:
{code:java}
object Test {
  val sc = new SparkContext("local", "test")

  def apply(): Unit = {
val rdd = sc.parallelize(1 to 2)
val thingy: Thingy = new Thingy

val someFunc = () => {
  val foo: String = "foo"

  thingy.run(block = () => {
rdd.map(r => {
  r + foo
}).count()
  })
}
someFunc()
  }
}

Test.apply()

class Thingy {
  def run[R](block: () => R): R = {
block()
  }
}
{code}
But it can't reproduce described error.

> Closure Cleaner Bug
> ---
>
> Key: SPARK-26534
> URL: https://issues.apache.org/jira/browse/SPARK-26534
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: sam
>Priority: Major
>
> I've found a strange combination of closures where the closure cleaner 
> doesn't seem to be smart enough to figure out how to remove a reference that 
> is not used. I.e. we get a `org.apache.spark.SparkException: Task not 
> serializable` for a Task that is perfectly serializable.  
>  
> In the example below, the only `val` that is actually needed for the closure 
> of the `map` is `foo`, but it tries to serialise `thingy`.  What is odd is 
> changing this code in a number of subtle ways eliminates the error, which 
> I've tried to highlight using comments inline.
>  
> {code:java}
> import org.apache.spark.sql._
> object Test {
>   val sparkSession: SparkSession =
> SparkSession.builder.master("local").appName("app").getOrCreate()
>   def apply(): Unit = {
> import sparkSession.implicits._
> val landedData: Dataset[String] = 
> sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS()
> // thingy has to be in this outer scope to reproduce, if in someFunc, 
> cannot reproduce
> val thingy: Thingy = new Thingy
> // If not wrapped in someFunc cannot reproduce
> val someFunc = () => {
>   // If don't reference this foo inside the closer (e.g. just use 
> identity function) cannot reproduce
>   val foo: String = "foo"
>   thingy.run(block = () => {
> landedData.map(r => {
>   r + foo
> })
> .count()
>   })
> }
> someFunc()
>   }
> }
> class Thingy {
>   def run[R](block: () => R): R = {
> block()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org