Hi Nick,

at the moment Flink uses Java serialization to ship the UDFs to the
cluster. Therefore, the closures must only contain Serializable objects.
The serializer registration only applies to the data which is processed by
the Flink job. Thus, for the moment I would try to get rid of the ColumnInfo
object in your closure.

Cheers,
Till
​

On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk <ndimi...@gmail.com> wrote:

> Hello,
>
> I've implemented a (streaming) flow using the Java API and Java8 Lambdas
> for various map functions. When I try to run the flow, job submission fails
> because of an unserializable type. This is not a type of data used within
> the flow, but rather a small collection of objects captured in the closure
> context over one of my Lambdas. I've implemented and registered a Kryo
> Serializer for this type with this environment, however, it's apparently
> not used when serializing the lambdas. Seems like the same serialization
> configuration and tools of the environment should be used when preparing
> the job for submission. Am I missing something?
>
> Thanks,
> Nick
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error.
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:252)
>         at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
>         at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
> Caused by: org.apache.flink.api.common.InvalidProgramException: Object
> ImportFlow$$Lambda$11/1615389290@44286963 not serializable
>         at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>         at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>         at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
>         at
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149)
>         at
> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)
>         at ImportFlow.assembleImportFlow(ImportFlow.java:111)
>         at ImportFlow.main(ImportFlow.java:178)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
>         ... 6 more
> Caused by: java.io.NotSerializableException:
> org.apache.phoenix.util.ColumnInfo
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>         at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>         at java.util.ArrayList.writeObject(ArrayList.java:762)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at
> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>         at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>         at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>         at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
>         at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
>         ... 17 more
>

Reply via email to