Re: ClosureCleaner does not work for java code

2015-08-10 Thread Sean Owen
The difference is really that Java and Scala work differently. In
Java, your anonymous subclass of Ops defined in (a method of)
AbstractTest captures a reference to it. That much is 'correct' in
that it's how Java is supposed to work, and AbstractTest is indeed not
serializable since you didn't declare it so.

However the reference isn't actually used and Spark tries to remove
references where possible for you. It can't always do it IIRC (e.g.
nulling some fields would mutate objects in unpredictable ways) and I
think that's what happens here.

In the first place you want to avoid having this hidden reference by
making, for instance, a static inner class or something. There's
probably a lot of ways to rewrite this.

Scala just works differently in the code that's generated.

On Mon, Aug 10, 2015 at 4:32 PM, Hao Ren  wrote:
> Consider two code snippets as the following:
>
> // Java code:
>
> abstract class Ops implements Serializable{
>
>   public abstract Integer apply(Integer x);
>
>   public void doSomething(JavaRDD rdd) {
> rdd.map(x -> x + apply(x))
>.collect()
>.forEach(System.out::println);
>   }
> }
>
> public class AbstractTest {
>
>   public static void main(String[] args) {
> new AbstractTest().job();
>   }
>
>   public void job() {
> SparkConf conf = new SparkConf()
>   .setAppName(AbstractTest.class.getName())
>   .setMaster("local[*]");
> JavaSparkContext jsc = new JavaSparkContext(conf);
>
> JavaRDD rdd = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));
>
> Ops ops = new Ops() {
>   @Override
>   public Integer apply(Integer x) {
> return x + 1;
>   }
> };
>
> ops.doSomething(rdd);
>
>   }
> }
>
>
> // Scala code:
>
> abstract class Ops extends Serializable {
>
>   def apply(x: Int): Int
>
>   def doSomething(rdd: RDD[Int]): Unit = {
> rdd.map(x => apply(x)).collect foreach println
>   }
> }
>
> class AbstractTest {
>   def job(): Unit = {
> val conf = new SparkConf()
>   .setAppName(this.getClass.getName)
>   .setMaster("local[*]")
> val sc = new SparkContext(conf)
>
> val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7))
>
> val ops = new Ops() {
>   override def apply(x: Int): Int = x + 1
> }
>
> ops.doSomething(rdd)
>   }
> }
>
> object AbstractTest {
>   def main(args: Array[String]): Unit = {
> new AbstractTest().job()
>   }
> }
>
> They are actually very similar, just doing the same thing, whereas the scala
> one works fine, and the java one does not. Task not serializable exception
> is encountered when the java code is executed, here is the state trace:
>
> Exception in thread "main" org.apache.spark.SparkException: Task not
> serializable
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
> at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:294)
> at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:293)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
> at org.apache.spark.rdd.RDD.map(RDD.scala:293)
> at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:90)
> at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:47)
> at fr.leboncoin.etl.jobs.test.Ops.doSomething(AbstractTest.java:24)
> at fr.leboncoin.etl.jobs.test.AbstractTest.job(AbstractTest.java:52)
> at fr.leboncoin.etl.jobs.test.AbstractTest.main(AbstractTest.java:33)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> Caused by: java.io.NotSerializableException:
> fr.leboncoin.etl.jobs.test.AbstractTest
>
> Serialization stack:
> - object not serializable (class: test.AbstractTest, value:
> test.AbstractTest@61d84e08)
> - field (class: test.AbstractTest$1, name: this$0, type: class
> test.AbstractTest)
> - object (class test.AbstractTest$1, test.AbstractTest$1@476e8796)
> - element of array (index: 0)
> - array (class [Ljava.lang.Object;, size 1)
> - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type:
> class [Ljava.lang.Object;)
> - object (class java.lang.invoke.SerializedLambda,
> SerializedLambda[capturingClass=class fr.leboncoin.etl.jobs.test.Ops,
> functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Ob

ClosureCleaner does not work for java code

2015-08-10 Thread Hao Ren
Consider two code snippets as the following:

// Java code:

abstract class Ops implements Serializable{

  public abstract Integer apply(Integer x);

  public void doSomething(JavaRDD rdd) {
rdd.map(x -> x + apply(x))
   .collect()
   .forEach(System.out::println);
  }
}

public class AbstractTest {

  public static void main(String[] args) {
new AbstractTest().job();
  }

  public void job() {
SparkConf conf = new SparkConf()
  .setAppName(AbstractTest.class.getName())
  .setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(conf);

JavaRDD rdd = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));

Ops ops = new Ops() {
  @Override
  public Integer apply(Integer x) {
return x + 1;
  }
};

ops.doSomething(rdd);

  }
}


// Scala code:

abstract class Ops extends Serializable {

  def apply(x: Int): Int

  def doSomething(rdd: RDD[Int]): Unit = {
rdd.map(x => apply(x)).collect foreach println
  }
}

class AbstractTest {
  def job(): Unit = {
val conf = new SparkConf()
  .setAppName(this.getClass.getName)
  .setMaster("local[*]")
val sc = new SparkContext(conf)

val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7))

val ops = new Ops() {
  override def apply(x: Int): Int = x + 1
}

ops.doSomething(rdd)
  }
}

object AbstractTest {
def main(args: Array[String]): Unit = {
new AbstractTest().job()
}
}

They are actually very similar, just doing the same thing, whereas the
scala one works fine, and the java one does not. Task not serializable
exception is encountered when the java code is executed, here is the state
trace:

Exception in thread "main" org.apache.spark.SparkException: Task not
serializable
at org.apache.spark.util.*ClosureCleaner*
$.ensureSerializable(ClosureCleaner.scala:315)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:294)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:293)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.map(RDD.scala:293)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:90)
at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:47)
at fr.leboncoin.etl.jobs.test.Ops.doSomething(AbstractTest.java:24)
at fr.leboncoin.etl.jobs.test.AbstractTest.job(AbstractTest.java:52)
at fr.leboncoin.etl.jobs.test.AbstractTest.main(AbstractTest.java:33)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.io.NotSerializableException:
fr.leboncoin.etl.jobs.test.AbstractTest

Serialization stack:
*- object not serializable (class: test.AbstractTest, value:
test.AbstractTest@61d84e08)*
* - field (class: test.AbstractTest$1, name: this$0, type: class
test.AbstractTest)*
* - object (class test.AbstractTest$1, test.AbstractTest$1@476e8796)*
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs,
type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda,
SerializedLambda[capturingClass=class fr.leboncoin.etl.jobs.test.Ops,
functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;,
implementation=invokeSpecial
fr/leboncoin/etl/jobs/test/Ops.lambda$doSomething$6d6228b6$1:(Ljava/lang/Integer;)Ljava/lang/Integer;,
instantiatedMethodType=(Ljava/lang/Integer;)Ljava/lang/Integer;,
numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class fr.leboncoin.etl.jobs.test.Ops$$Lambda$8/476868388,
fr.leboncoin.etl.jobs.test.Ops$$Lambda$8/476868388@65753040)
- field (class:
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name:
fun$1, type: interface org.apache.spark.api.java.function.Function)
- object (class
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1,
)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
... 19 more

It seems that, in ord