[ https://issues.apache.org/jira/browse/BEAM-1255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alexey Diomin reassigned BEAM-1255: ----------------------------------- Assignee: Alexey Diomin (was: Maximilian Michels) > java.io.NotSerializableException in flink on UnboundedSource > ------------------------------------------------------------ > > Key: BEAM-1255 > URL: https://issues.apache.org/jira/browse/BEAM-1255 > Project: Beam > Issue Type: Bug > Components: runner-flink > Affects Versions: 0.5.0 > Reporter: Alexey Diomin > Assignee: Alexey Diomin > > After introduce new Coders with TypeDescriptor on flink runner we have issue: > {code} > Caused by: java.io.NotSerializableException: > sun.reflect.generics.reflectiveObjects.TypeVariableImpl > - element of array (index: 0) > - array (class "[Ljava.lang.Object;", size: 2) > - field (class > "com.google.common.collect.ImmutableList$SerializedForm", name: "elements", > type: "class [Ljava.lang.Object;") > - object (class > "com.google.common.collect.ImmutableList$SerializedForm", > com.google.common.collect.ImmutableList$SerializedForm@30af5b6b) > - field (class "com.google.common.reflect.Types$ParameterizedTypeImpl", > name: "argumentsList", type: "class com.google.common.collect.ImmutableList") > - object (class > "com.google.common.reflect.Types$ParameterizedTypeImpl", > org.apache.beam.sdk.io.UnboundedSource<OutputT, CheckpointMarkT>) > - field (class "com.google.common.reflect.TypeToken", name: > "runtimeType", type: "interface java.lang.reflect.Type") > - object (class "com.google.common.reflect.TypeToken$SimpleTypeToken", > org.apache.beam.sdk.io.UnboundedSource<OutputT, CheckpointMarkT>) > - field (class "org.apache.beam.sdk.values.TypeDescriptor", name: > "token", type: "class com.google.common.reflect.TypeToken") > - object (class > "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper$1", > org.apache.beam.sdk.io.UnboundedSource<OutputT, CheckpointMarkT>) > - field (class "org.apache.beam.sdk.coders.SerializableCoder", name: > "typeDescriptor", type: "class org.apache.beam.sdk.values.TypeDescriptor") > - object (class "org.apache.beam.sdk.coders.SerializableCoder", > SerializableCoder) > - field (class "org.apache.beam.sdk.coders.KvCoder", name: "keyCoder", > type: "interface org.apache.beam.sdk.coders.Coder") > - object (class "org.apache.beam.sdk.coders.KvCoder", > KvCoder(SerializableCoder,AvroCoder)) > - field (class "org.apache.beam.sdk.coders.IterableLikeCoder", name: > "elementCoder", type: "interface org.apache.beam.sdk.coders.Coder") > - object (class "org.apache.beam.sdk.coders.ListCoder", > ListCoder(KvCoder(SerializableCoder,AvroCoder))) > - field (class > "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper", > name: "checkpointCoder", type: "class org.apache.beam.sdk.coders.ListCoder") > - root object (class > "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper", > > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper@b2c5e07) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182) > {code} > bug introduced after commit: > 7b98fa08d14e8121e8885f00a9a9a878b73f81a6 > pull request: > https://github.com/apache/beam/pull/1537 > Code for reproduce error > {code} > import com.google.common.collect.ImmutableList; > import org.apache.beam.runners.flink.FlinkPipelineOptions; > import org.apache.beam.runners.flink.FlinkRunner; > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.kafka.KafkaIO; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > public class FlinkSerialisationError { > public static void main(String[] args) { > FlinkPipelineOptions options = > PipelineOptionsFactory.as(FlinkPipelineOptions.class); > options.setRunner(FlinkRunner.class); > options.setStreaming(true); > Pipeline pipeline = Pipeline.create(options); > pipeline.apply( > KafkaIO.read() > .withBootstrapServers("localhost:9092") > .withTopics(ImmutableList.of("test")) > // set ConsumerGroup > .withoutMetadata()); > pipeline.run(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)