Just for the record, the Spark version of that works fine: ``` %spark
case class C2(x: Int) val xs = sc.parallelize(1 to 10) val csSpark = xs.map{C2(_)} csSpark.collect res3: Array[C2] = Array(C2(1), C2(2), C2(3), C2(4), C2(5), C2(6), C2(7), C2(8), C2(9), C2(10)) ``` Thanks, Juan On Sun, Apr 19, 2020 at 10:15 AM Juan Rodríguez Hortalá < juan.rodriguez.hort...@gmail.com> wrote: > Minimal reproduction: > > - Fist option > > ```scala > case class C(x: Int) > > val xs = benv.fromCollection(1 to 10) > val cs = xs.map{C(_)} > > cs.count > ``` > > defined class C xs: org.apache.flink.api.scala.DataSet[Int] = > org.apache.flink.api.scala.DataSet@39c713c6 cs: > org.apache.flink.api.scala.DataSet[C] = > org.apache.flink.api.scala.DataSet@205a35a > java.lang.IllegalArgumentException: requirement failed: The class C is an > instance class, meaning it is not a member of a toplevel object, or of an > object contained in a toplevel object, therefore it requires an outer > instance to be instantiated, but we don't have a reference to the outer > instance. Please consider changing the outer class to an object. at > scala.Predef$.require(Predef.scala:224) at > org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$.lookupConstructor(ScalaCaseClassSerializer.scala:90) > at > org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.<init>(ScalaCaseClassSerializer.scala:46) > ... 125 elided > > - Second option > > ```scala > object Types { > case class C(x: Int) > } > > val cs2 = xs.map{Types.C(_)} > > cs2.count > ``` > > defined object Types org.apache.flink.api.common.InvalidProgramException: > Task not serializable at > org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408) > at > org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400) > at > org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168) > at org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) at > org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) at > org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) ... 106 elided > Caused by: java.io.NotSerializableException: > org.apache.flink.api.scala.DataSet at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > > Greetings, > > Juan > > > On Sun, Apr 19, 2020 at 10:05 AM Juan Rodríguez Hortalá < > juan.rodriguez.hort...@gmail.com> wrote: > >> Hi, >> >> I'm using the Flink interpreter and the benv environment. I'm reading >> some csv files using benv.readCsvFile and it works ok. I have also defined >> a case class C for the csv records. The problem happens when I apply a >> map operation on the DataSet of tuples returned by benv.readCsvFile, to >> convert it into a DataSet[C]. >> >> - If I define the case class C in some cell I get this error: >> >> java.lang.IllegalArgumentException: requirement failed: The class C is an >> instance class, meaning it is not a member of a toplevel object, or of an >> object contained in a toplevel object, therefore it requires an outer >> instance to be instantiated, but we don't have a reference to the outer >> instance. Please consider changing the outer class to an object. >> >> >> - That sounds related to this >> >> https://stackoverflow.com/questions/36042720/case-class-serialazation-in-flink, >> it looks like the zeppelin flink interpreter is wrapping the case class >> definition as an inner class. I tried defining the case class C inside an >> object Types that I define in another cell. With that I also get a >> serialization exception. >> >> org.apache.flink.api.common.InvalidProgramException: Task not >> serializable >> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:408) >> org.apache.flink.api.scala.ClosureCleaner$.org$apache$flink$api$scala$ClosureCleaner$$clean(ClosureCleaner.scala:400) >> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:168) >> org.apache.flink.api.scala.DataSet.clean(DataSet.scala:125) >> org.apache.flink.api.scala.DataSet$$anon$1.<init>(DataSet.scala:489) >> org.apache.flink.api.scala.DataSet.map(DataSet.scala:488) >> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.liftedTree1$1(<console>:135) >> $line163.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$82b5b23cea489b2712a1db46c77e458$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133) >> >> I guess that didn't work because the object Types is still defined inside >> some class implicitly defined by the interpreter. >> >> Any thoughs about how can I fix this? Also, I understand $line163 etc >> refer to the code in the cells, is there some convention I can use to >> understand to which line in the notebook those error messages are referring >> to? >> >> Thanks in advance, >> >> Juan >> >