Consider two code snippets as the following: // Java code:
abstract class Ops implements Serializable{ public abstract Integer apply(Integer x); public void doSomething(JavaRDD<Integer> rdd) { rdd.map(x -> x + apply(x)) .collect() .forEach(System.out::println); } } public class AbstractTest { public static void main(String[] args) { new AbstractTest().job(); } public void job() { SparkConf conf = new SparkConf() .setAppName(AbstractTest.class.getName()) .setMaster("local[*]"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6)); Ops ops = new Ops() { @Override public Integer apply(Integer x) { return x + 1; } }; ops.doSomething(rdd); } } // Scala code: abstract class Ops extends Serializable { def apply(x: Int): Int def doSomething(rdd: RDD[Int]): Unit = { rdd.map(x => apply(x)).collect foreach println } } class AbstractTest { def job(): Unit = { val conf = new SparkConf() .setAppName(this.getClass.getName) .setMaster("local[*]") val sc = new SparkContext(conf) val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7)) val ops = new Ops() { override def apply(x: Int): Int = x + 1 } ops.doSomething(rdd) } } object AbstractTest { def main(args: Array[String]): Unit = { new AbstractTest().job() } } They are actually very similar, just doing the same thing, whereas the scala one works fine, and the java one does not. Task not serializable exception is encountered when the java code is executed, here is the state trace: Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.*ClosureCleaner* $.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:294) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:293) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.map(RDD.scala:293) at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:90) at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:47) at fr.leboncoin.etl.jobs.test.Ops.doSomething(AbstractTest.java:24) at fr.leboncoin.etl.jobs.test.AbstractTest.job(AbstractTest.java:52) at fr.leboncoin.etl.jobs.test.AbstractTest.main(AbstractTest.java:33) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.io.NotSerializableException: fr.leboncoin.etl.jobs.test.AbstractTest Serialization stack: *- object not serializable (class: test.AbstractTest, value: test.AbstractTest@61d84e08)* * - field (class: test.AbstractTest$1, name: this$0, type: class test.AbstractTest)* * - object (class test.AbstractTest$1, test.AbstractTest$1@476e8796)* - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class fr.leboncoin.etl.jobs.test.Ops, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeSpecial fr/leboncoin/etl/jobs/test/Ops.lambda$doSomething$6d6228b6$1:(Ljava/lang/Integer;)Ljava/lang/Integer;, instantiatedMethodType=(Ljava/lang/Integer;)Ljava/lang/Integer;, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class fr.leboncoin.etl.jobs.test.Ops$$Lambda$8/476868388, fr.leboncoin.etl.jobs.test.Ops$$Lambda$8/476868388@65753040) - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function) - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312) ... 19 more It seems that, in order to serialize the anonymous class ` *test.AbstractTest$1*` (ops), it serialize `*test.AbstractTest*` first, which should not be serialized. The difference is on the type of RDD. In java code, JavaRDD is used. I am wondering whether the ClosureCleaner does not work well with JavaRDD. According to spark code, JavaRDD uses scala API apparently: def map[R](f: JFunction[T, R]): JavaRDD[R] = new JavaRDD(rdd.map(f)(fakeClassTag))(fakeClassTag) You can reproduce this issue easily, any help is appreciated. -- Hao Ren Data Engineer @ leboncoin Paris, France