[jira] [Commented] (SPARK-26534) Closure Cleaner Bug
[ https://issues.apache.org/jira/browse/SPARK-26534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16735699#comment-16735699 ] sam commented on SPARK-26534: - [~viirya] If I change to RDD I cannot reproduce either. This is further evidence that this is certainly a bug, since serialisation of closures ought to be independent whether we use RDD or Dataset. I have pasted the full output of my sbt console session to show the error for Dataset. > Closure Cleaner Bug > --- > > Key: SPARK-26534 > URL: https://issues.apache.org/jira/browse/SPARK-26534 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: sam >Priority: Major > > I've found a strange combination of closures where the closure cleaner > doesn't seem to be smart enough to figure out how to remove a reference that > is not used. I.e. we get a `org.apache.spark.SparkException: Task not > serializable` for a Task that is perfectly serializable. > > In the example below, the only `val` that is actually needed for the closure > of the `map` is `foo`, but it tries to serialise `thingy`. What is odd is > changing this code in a number of subtle ways eliminates the error, which > I've tried to highlight using comments inline. > > {code:java} > import org.apache.spark.sql._ > object Test { > val sparkSession: SparkSession = > SparkSession.builder.master("local").appName("app").getOrCreate() > def apply(): Unit = { > import sparkSession.implicits._ > val landedData: Dataset[String] = > sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS() > // thingy has to be in this outer scope to reproduce, if in someFunc, > cannot reproduce > val thingy: Thingy = new Thingy > // If not wrapped in someFunc cannot reproduce > val someFunc = () => { > // If don't reference this foo inside the closer (e.g. just use > identity function) cannot reproduce > val foo: String = "foo" > thingy.run(block = () => { > landedData.map(r => { > r + foo > }) > .count() > }) > } > someFunc() > } > } > class Thingy { > def run[R](block: () => R): R = { > block() > } > } > {code} > The full trace if ran in `sbt console` > {code} > scala> class Thingy { > | def run[R](block: () => R): R = { > | block() > | } > | } > defined class Thingy > scala> > scala> object Test { > | val sparkSession: SparkSession = > | SparkSession.builder.master("local").appName("app").getOrCreate() > | > | def apply(): Unit = { > | import sparkSession.implicits._ > | > | val landedData: Dataset[String] = > sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS() > | > | // thingy has to be in this outer scope to reproduce, if in > someFunc, cannot reproduce > | val thingy: Thingy = new Thingy > | > | // If not wrapped in someFunc cannot reproduce > | val someFunc = () => { > | // If don't reference this foo inside the closer (e.g. just use > identity function) cannot reproduce > | val foo: String = "foo" > | > | thingy.run(block = () => { > | landedData.map(r => { > | r + foo > | }) > | .count() > | }) > | } > | > | someFunc() > | > | } > | } > defined object Test > scala> > scala> > scala> Test.apply() > Using Spark's default log4j profile: > org/apache/spark/log4j-defaults.properties > 19/01/07 11:27:19 INFO SparkContext: Running Spark version 2.3.1 > 19/01/07 11:27:20 WARN NativeCodeLoader: Unable to load native-hadoop library > for your platform... using builtin-java classes where applicable > 19/01/07 11:27:20 INFO SparkContext: Submitted application: app > 19/01/07 11:27:20 INFO SecurityManager: Changing view acls to: sams > 19/01/07 11:27:20 INFO SecurityManager: Changing modify acls to: sams > 19/01/07 11:27:20 INFO SecurityManager: Changing view acls groups to: > 19/01/07 11:27:20 INFO SecurityManager: Changing modify acls groups to: > 19/01/07 11:27:20 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(sams); groups > with view permissions: Set(); users with modify permissions: Set(sams); > groups with modify permissions: Set() > 19/01/07 11:27:20 INFO Utils: Successfully started service 'sparkDriver' on > port 54066. > 19/01/07 11:27:20 INFO SparkEnv: Registering MapOutputTracker > 19/01/07 11:27:20 INFO SparkEnv: Registering BlockManagerMaster > 19/01/07 11:27:20 INFO BlockManagerMasterEndpoint: Using > org.apache.spark.storage.DefaultTopologyMapper for getting topology > information > 19/01/07 11:27:20 INFO
[jira] [Commented] (SPARK-26534) Closure Cleaner Bug
[ https://issues.apache.org/jira/browse/SPARK-26534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16735214#comment-16735214 ] Liang-Chi Hsieh commented on SPARK-26534: - I think the only difference is using Dataset or RDD. Can you replace with RDD and see if the issue is still? > Closure Cleaner Bug > --- > > Key: SPARK-26534 > URL: https://issues.apache.org/jira/browse/SPARK-26534 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: sam >Priority: Major > > I've found a strange combination of closures where the closure cleaner > doesn't seem to be smart enough to figure out how to remove a reference that > is not used. I.e. we get a `org.apache.spark.SparkException: Task not > serializable` for a Task that is perfectly serializable. > > In the example below, the only `val` that is actually needed for the closure > of the `map` is `foo`, but it tries to serialise `thingy`. What is odd is > changing this code in a number of subtle ways eliminates the error, which > I've tried to highlight using comments inline. > > {code:java} > import org.apache.spark.sql._ > object Test { > val sparkSession: SparkSession = > SparkSession.builder.master("local").appName("app").getOrCreate() > def apply(): Unit = { > import sparkSession.implicits._ > val landedData: Dataset[String] = > sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS() > // thingy has to be in this outer scope to reproduce, if in someFunc, > cannot reproduce > val thingy: Thingy = new Thingy > // If not wrapped in someFunc cannot reproduce > val someFunc = () => { > // If don't reference this foo inside the closer (e.g. just use > identity function) cannot reproduce > val foo: String = "foo" > thingy.run(block = () => { > landedData.map(r => { > r + foo > }) > .count() > }) > } > someFunc() > } > } > class Thingy { > def run[R](block: () => R): R = { > block() > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26534) Closure Cleaner Bug
[ https://issues.apache.org/jira/browse/SPARK-26534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16735211#comment-16735211 ] sam commented on SPARK-26534: - [~viirya] Your version is slightly different, can you reproduce using exactly my code? > Closure Cleaner Bug > --- > > Key: SPARK-26534 > URL: https://issues.apache.org/jira/browse/SPARK-26534 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: sam >Priority: Major > > I've found a strange combination of closures where the closure cleaner > doesn't seem to be smart enough to figure out how to remove a reference that > is not used. I.e. we get a `org.apache.spark.SparkException: Task not > serializable` for a Task that is perfectly serializable. > > In the example below, the only `val` that is actually needed for the closure > of the `map` is `foo`, but it tries to serialise `thingy`. What is odd is > changing this code in a number of subtle ways eliminates the error, which > I've tried to highlight using comments inline. > > {code:java} > import org.apache.spark.sql._ > object Test { > val sparkSession: SparkSession = > SparkSession.builder.master("local").appName("app").getOrCreate() > def apply(): Unit = { > import sparkSession.implicits._ > val landedData: Dataset[String] = > sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS() > // thingy has to be in this outer scope to reproduce, if in someFunc, > cannot reproduce > val thingy: Thingy = new Thingy > // If not wrapped in someFunc cannot reproduce > val someFunc = () => { > // If don't reference this foo inside the closer (e.g. just use > identity function) cannot reproduce > val foo: String = "foo" > thingy.run(block = () => { > landedData.map(r => { > r + foo > }) > .count() > }) > } > someFunc() > } > } > class Thingy { > def run[R](block: () => R): R = { > block() > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26534) Closure Cleaner Bug
[ https://issues.apache.org/jira/browse/SPARK-26534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16735130#comment-16735130 ] Liang-Chi Hsieh commented on SPARK-26534: - I do a test as below: {code:java} object Test { val sc = new SparkContext("local", "test") def apply(): Unit = { val rdd = sc.parallelize(1 to 2) val thingy: Thingy = new Thingy val someFunc = () => { val foo: String = "foo" thingy.run(block = () => { rdd.map(r => { r + foo }).count() }) } someFunc() } } Test.apply() class Thingy { def run[R](block: () => R): R = { block() } } {code} But it can't reproduce described error. > Closure Cleaner Bug > --- > > Key: SPARK-26534 > URL: https://issues.apache.org/jira/browse/SPARK-26534 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: sam >Priority: Major > > I've found a strange combination of closures where the closure cleaner > doesn't seem to be smart enough to figure out how to remove a reference that > is not used. I.e. we get a `org.apache.spark.SparkException: Task not > serializable` for a Task that is perfectly serializable. > > In the example below, the only `val` that is actually needed for the closure > of the `map` is `foo`, but it tries to serialise `thingy`. What is odd is > changing this code in a number of subtle ways eliminates the error, which > I've tried to highlight using comments inline. > > {code:java} > import org.apache.spark.sql._ > object Test { > val sparkSession: SparkSession = > SparkSession.builder.master("local").appName("app").getOrCreate() > def apply(): Unit = { > import sparkSession.implicits._ > val landedData: Dataset[String] = > sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS() > // thingy has to be in this outer scope to reproduce, if in someFunc, > cannot reproduce > val thingy: Thingy = new Thingy > // If not wrapped in someFunc cannot reproduce > val someFunc = () => { > // If don't reference this foo inside the closer (e.g. just use > identity function) cannot reproduce > val foo: String = "foo" > thingy.run(block = () => { > landedData.map(r => { > r + foo > }) > .count() > }) > } > someFunc() > } > } > class Thingy { > def run[R](block: () => R): R = { > block() > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org