Re: ClosureCleaner does not work for java code
The difference is really that Java and Scala work differently. In Java, your anonymous subclass of Ops defined in (a method of) AbstractTest captures a reference to it. That much is 'correct' in that it's how Java is supposed to work, and AbstractTest is indeed not serializable since you didn't declare it so. However the reference isn't actually used and Spark tries to remove references where possible for you. It can't always do it IIRC (e.g. nulling some fields would mutate objects in unpredictable ways) and I think that's what happens here. In the first place you want to avoid having this hidden reference by making, for instance, a static inner class or something. There's probably a lot of ways to rewrite this. Scala just works differently in the code that's generated. On Mon, Aug 10, 2015 at 4:32 PM, Hao Ren wrote: > Consider two code snippets as the following: > > // Java code: > > abstract class Ops implements Serializable{ > > public abstract Integer apply(Integer x); > > public void doSomething(JavaRDD rdd) { > rdd.map(x -> x + apply(x)) >.collect() >.forEach(System.out::println); > } > } > > public class AbstractTest { > > public static void main(String[] args) { > new AbstractTest().job(); > } > > public void job() { > SparkConf conf = new SparkConf() > .setAppName(AbstractTest.class.getName()) > .setMaster("local[*]"); > JavaSparkContext jsc = new JavaSparkContext(conf); > > JavaRDD rdd = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6)); > > Ops ops = new Ops() { > @Override > public Integer apply(Integer x) { > return x + 1; > } > }; > > ops.doSomething(rdd); > > } > } > > > // Scala code: > > abstract class Ops extends Serializable { > > def apply(x: Int): Int > > def doSomething(rdd: RDD[Int]): Unit = { > rdd.map(x => apply(x)).collect foreach println > } > } > > class AbstractTest { > def job(): Unit = { > val conf = new SparkConf() > .setAppName(this.getClass.getName) > .setMaster("local[*]") > val sc = new SparkContext(conf) > > val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7)) > > val ops = new Ops() { > override def apply(x: Int): Int = x + 1 > } > > ops.doSomething(rdd) > } > } > > object AbstractTest { > def main(args: Array[String]): Unit = { > new AbstractTest().job() > } > } > > They are actually very similar, just doing the same thing, whereas the scala > one works fine, and the java one does not. Task not serializable exception > is encountered when the java code is executed, here is the state trace: > > Exception in thread "main" org.apache.spark.SparkException: Task not > serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) > at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) > at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:294) > at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:293) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) > at org.apache.spark.rdd.RDD.map(RDD.scala:293) > at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:90) > at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:47) > at fr.leboncoin.etl.jobs.test.Ops.doSomething(AbstractTest.java:24) > at fr.leboncoin.etl.jobs.test.AbstractTest.job(AbstractTest.java:52) > at fr.leboncoin.etl.jobs.test.AbstractTest.main(AbstractTest.java:33) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > Caused by: java.io.NotSerializableException: > fr.leboncoin.etl.jobs.test.AbstractTest > > Serialization stack: > - object not serializable (class: test.AbstractTest, value: > test.AbstractTest@61d84e08) > - field (class: test.AbstractTest$1, name: this$0, type: class > test.AbstractTest) > - object (class test.AbstractTest$1, test.AbstractTest$1@476e8796) > - element of array (index: 0) > - array (class [Ljava.lang.Object;, size 1) > - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: > class [Ljava.lang.Object;) > - object (class java.lang.invoke.SerializedLambda, > SerializedLambda[capturingClass=class fr.leboncoin.etl.jobs.test.Ops, > functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Ob
ClosureCleaner does not work for java code
Consider two code snippets as the following: // Java code: abstract class Ops implements Serializable{ public abstract Integer apply(Integer x); public void doSomething(JavaRDD rdd) { rdd.map(x -> x + apply(x)) .collect() .forEach(System.out::println); } } public class AbstractTest { public static void main(String[] args) { new AbstractTest().job(); } public void job() { SparkConf conf = new SparkConf() .setAppName(AbstractTest.class.getName()) .setMaster("local[*]"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD rdd = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6)); Ops ops = new Ops() { @Override public Integer apply(Integer x) { return x + 1; } }; ops.doSomething(rdd); } } // Scala code: abstract class Ops extends Serializable { def apply(x: Int): Int def doSomething(rdd: RDD[Int]): Unit = { rdd.map(x => apply(x)).collect foreach println } } class AbstractTest { def job(): Unit = { val conf = new SparkConf() .setAppName(this.getClass.getName) .setMaster("local[*]") val sc = new SparkContext(conf) val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7)) val ops = new Ops() { override def apply(x: Int): Int = x + 1 } ops.doSomething(rdd) } } object AbstractTest { def main(args: Array[String]): Unit = { new AbstractTest().job() } } They are actually very similar, just doing the same thing, whereas the scala one works fine, and the java one does not. Task not serializable exception is encountered when the java code is executed, here is the state trace: Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.*ClosureCleaner* $.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:294) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:293) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.map(RDD.scala:293) at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:90) at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:47) at fr.leboncoin.etl.jobs.test.Ops.doSomething(AbstractTest.java:24) at fr.leboncoin.etl.jobs.test.AbstractTest.job(AbstractTest.java:52) at fr.leboncoin.etl.jobs.test.AbstractTest.main(AbstractTest.java:33) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.io.NotSerializableException: fr.leboncoin.etl.jobs.test.AbstractTest Serialization stack: *- object not serializable (class: test.AbstractTest, value: test.AbstractTest@61d84e08)* * - field (class: test.AbstractTest$1, name: this$0, type: class test.AbstractTest)* * - object (class test.AbstractTest$1, test.AbstractTest$1@476e8796)* - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class fr.leboncoin.etl.jobs.test.Ops, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeSpecial fr/leboncoin/etl/jobs/test/Ops.lambda$doSomething$6d6228b6$1:(Ljava/lang/Integer;)Ljava/lang/Integer;, instantiatedMethodType=(Ljava/lang/Integer;)Ljava/lang/Integer;, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class fr.leboncoin.etl.jobs.test.Ops$$Lambda$8/476868388, fr.leboncoin.etl.jobs.test.Ops$$Lambda$8/476868388@65753040) - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function) - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) ... 19 more It seems that, in ord