Hello Hao Ren,
Doesn't the code...
val add = udf {
(a: Int) => a + notSer.value
}
Mean UDF function that Int => Int ?
Thanks,
Muthu
On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren <[email protected]> wrote:
> I am playing with spark 2.0
> What I tried to test is:
>
> Create a UDF in which there is a non serializable object.
> What I expected is when this UDF is called during materializing the
> dataFrame where the UDF is used in "select", an task non serializable
> exception should be thrown.
> It depends also which "action" is called on that dataframe.
>
> Here is the code for reproducing the pb:
>
> ============
> object DataFrameSerDeTest extends App {
>
> class A(val value: Int) // It is not serializable
>
> def run() = {
> val spark = SparkSession
> .builder()
> .appName("DataFrameSerDeTest")
> .master("local[*]")
> .getOrCreate()
>
> import org.apache.spark.sql.functions.udf
> import spark.sqlContext.implicits._
>
> val notSer = new A(2)
> val add = udf {
> (a: Int) => a + notSer.value
> }
> val df = spark.createDataFrame(Seq(
> (1, 2),
> (2, 2),
> (3, 2),
> (4, 2)
> )).toDF("key", "value")
> .select($"key", add($"value").as("added"))
>
> df.show() // *It should not work because the udf contains a
> non-serializable object, but it works*
>
> df.filter($"key" === 2).show() // *It does not work as expected
> (org.apache.spark.SparkException: Task not serializable)*
> }
>
> run()
> }
> ============
>
> Also, I tried collect(), count(), first(), limit(). All of them worked
> without non-serializable exceptions.
> It seems only filter() throws the exception. (feature or bug ?)
>
> Any ideas ? Or I just messed things up ?
> Any help is highly appreciated.
>
> --
> Hao Ren
>
> Data Engineer @ leboncoin
>
> Paris, France
>