Just for the record, the Spark version of that works fine:

```
%spark

case class C2(x: Int)

val xs = sc.parallelize(1 to 10)
val csSpark = xs.map{C2(_)}

csSpark.collect

res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6), C2(7),
C2(8), C2(9), C2(10))
```

Thanks,

Juan

On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

> Minimal reproduction:
>
>    - Fist option
>
> ```scala
> case class C(x: Int)
>
> val xs = benv.fromCollection(1 to 10)
> val cs = xs.map{C(_)}
>
> cs.count
> ```
>
> defined class C xs: org.apache.flink.api.scala.DataSet[Int] =
> org.apache.flink.api.scala.DataSet@39c713c6 cs:
> org.apache.flink.api.scala.DataSet[C] =
> org.apache.flink.api.scala.DataSet@205a35a
> java.lang.IllegalArgumentException: requirement failed: The class C is an
> instance class, meaning it is not a member of a toplevel object, or of an
> object contained in a toplevel object, therefore it requires an outer
> instance to be instantiated, but we don't have a reference to the outer
> instance. Please consider changing the outer class to an object. at
> scala.Predef$.require(Predef.scala:224) at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90)
> at
> org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46)
> ... 125 elided
>
>    - Second option
>
> ```scala
> object Types {
>     case class C(x: Int)
> }
>
> val cs2 = xs.map{Types.C(_)}
>
> cs2.count
> ```
>
> defined object Types org.apache.flink.api.common.InvalidProgramException:
> Task not serializable at
> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
> at
> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
> at
> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
> at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at
> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at
> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided
> Caused by: java.io.NotSerializableException:
> org.apache.flink.api.scala.DataSet at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
> Greetings,
>
> Juan
>
>
> On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm using the Flink interpreter and the benv environment. I'm reading
>> some csv files using benv.readCsvFile and it works ok. I have also defined
>> a case class C for the csv records. The problem happens when I apply a
>> map operation on the DataSet of tuples returned by benv.readCsvFile, to
>> convert it into a DataSet[C].
>>
>>    - If I define the case class C in some cell I get this error:
>>
>> java.lang.IllegalArgumentException: requirement failed: The class C is an
>> instance class, meaning it is not a member of a toplevel object, or of an
>> object contained in a toplevel object, therefore it requires an outer
>> instance to be instantiated, but we don't have a reference to the outer
>> instance. Please consider changing the outer class to an object.
>>
>>
>>    - That sounds related to this
>>    
>> https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink,
>>    it looks like the zeppelin flink interpreter is wrapping the case class
>>    definition as an inner class. I tried defining the case class C inside an
>>    object Types that I define in another cell. With that I also get a
>>    serialization exception.
>>
>> org.apache.flink.api.common.InvalidProgramException: Task not
>> serializable
>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408)
>> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400)
>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168)
>> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125)
>> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489)
>> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488)
>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135)
>> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
>>
>> I guess that didn't work because the object Types is still defined inside
>> some class implicitly defined by the interpreter.
>>
>> Any thoughs about how can I fix this? Also, I understand $line163 etc
>> refer to the code in the cells, is there some convention I can use to
>> understand to which line in the notebook those error messages are referring
>> to?
>>
>> Thanks in advance,
>>
>> Juan
>>
>

Reply via email to