Hi Stephan! Here's the trace (flink 0.9.1 + scala 2.10.5)
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: Cannot instantiate StreamRecord. at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.createInstance(StreamRecordSerializer.java:63) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInputStreamTask.java:66) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:101) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Cannot instantiate class. at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:225) at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.createInstance(StreamRecordSerializer.java:60) ... 4 more Caused by: java.lang.IllegalArgumentException: Can not set int field org.myorg.quickstart.Test$X.id to org.myorg.quickstart.Test$Id at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167) at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171) at sun.reflect.UnsafeIntegerFieldAccessorImpl.set(UnsafeIntegerFieldAccessorImpl.java:98) at java.lang.reflect.Field.set(Field.java:764) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:232) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:221) ... 5 more > On 23 Sep 2015, at 14:37, Stephan Ewen <se...@apache.org> wrote: > > Hi Jack! > > This should be supported, there is no strict requirement for mutable types. > > The POJO rules apply only if you want to use the "by-field-name" addressing > for keys. In Scala, you should be able to use case classes as well, even if > they are immutable. > > Can you post the exception that you get? > > Greetings, > Stephan > > >> On Wed, Sep 23, 2015 at 1:29 PM, Jack <jack-kn...@marmelandia.com> wrote: >> Hi, >> >> I'm having trouble integrating existing Scala code with Flink, due to >> POJO-only requirement. >> >> We're using AnyVal heavily for type safety, and immutable classes as a >> default. For example, the following does not work: >> >> object Test { >> class Id(val underlying: Int) extends AnyVal >> >> class X(var id: Id) { >> def this() { this(new Id(0)) } >> } >> >> class MySource extends SourceFunction[X] { >> def run(ctx: SourceFunction.SourceContext[X]) { >> ctx.collect(new X(new Id(1))) >> } >> def cancel() {} >> } >> >> def main(args: Array[String]) { >> val env = StreamExecutionContext.getExecutionContext >> env.addSource(new MySource).print >> env.execute("Test") >> } >> } >> >> Currently I'm thinking that I would need to have duplicate classes and code >> for Flint and for non-Flint code, or somehow use immutable interfaces for >> non-Flint code. Both ways are expensive in terms of development time. >> >> Would you have any guidance on how to integrate Flink with a code base that >> has immutability as a norm? >> >> Thanks >