Hi Stephan!

Here's the trace (flink 0.9.1 + scala 2.10.5)

Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Cannot instantiate StreamRecord.
at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.createInstance(StreamRecordSerializer.java:63)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneInputStreamTask.java:66)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:101)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Cannot instantiate class.
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:225)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.createInstance(StreamRecordSerializer.java:60)
... 4 more
Caused by: java.lang.IllegalArgumentException: Can not set int field 
org.myorg.quickstart.Test$X.id to org.myorg.quickstart.Test$Id
at 
sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167)
at 
sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171)
at 
sun.reflect.UnsafeIntegerFieldAccessorImpl.set(UnsafeIntegerFieldAccessorImpl.java:98)
at java.lang.reflect.Field.set(Field.java:764)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:232)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:221)
... 5 more

> On 23 Sep 2015, at 14:37, Stephan Ewen <se...@apache.org> wrote:
> 
> Hi Jack!
> 
> This should be supported, there is no strict requirement for mutable types.
> 
> The POJO rules apply only if you want to use the "by-field-name" addressing 
> for keys. In Scala, you should be able to use case classes as well, even if 
> they are immutable.
> 
> Can you post the exception that you get?
> 
> Greetings,
> Stephan
> 
> 
>> On Wed, Sep 23, 2015 at 1:29 PM, Jack <jack-kn...@marmelandia.com> wrote:
>> Hi,
>> 
>> I'm having trouble integrating existing Scala code with Flink, due to 
>> POJO-only requirement.
>> 
>> We're using AnyVal heavily for type safety, and immutable classes as a 
>> default. For example, the following does not work:
>> 
>> object Test {
>>   class Id(val underlying: Int) extends AnyVal
>> 
>>   class X(var id: Id) {
>>     def this() { this(new Id(0)) }
>>   }
>> 
>>   class MySource extends SourceFunction[X] {
>>     def run(ctx: SourceFunction.SourceContext[X]) {
>>       ctx.collect(new X(new Id(1)))
>>     }
>>     def cancel() {}
>>   }
>> 
>>   def main(args: Array[String]) {
>>     val env = StreamExecutionContext.getExecutionContext
>>     env.addSource(new MySource).print
>>     env.execute("Test")
>>   }
>> }
>> 
>> Currently I'm thinking that I would need to have duplicate classes and code 
>> for Flint and for non-Flint code, or somehow use immutable interfaces for 
>> non-Flint code. Both ways are expensive in terms of development time.
>> 
>> Would you have any guidance on how to integrate Flink with a code base that 
>> has immutability as a norm?
>> 
>> Thanks
> 

Reply via email to