I think it has to do with Flink's internal use of Kryo - take a look at this: http://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink I'm sure Flink committers will soon reach out to correct me if I'm missing something..
On Thu, Jun 2, 2016 at 5:01 AM Viswadeep <[email protected]> wrote: > Hi > > I am using apache beam with flink and ProtoBuf for encoding and decoding. > > The following is the method for the FlinkKafka Consumer. > > public UnboundedSource<T extends Message,UnboundedSource.CheckpointMark> > build() { > Properties p = new Properties(); > p.setProperty("zookeeper.connect", kafkaOptions.getZookeeper()); > p.setProperty("bootstrap.servers", kafkaOptions.getBroker()); > p.setProperty("group.id", kafkaOptions.getGroup()); > FlinkKafkaConsumer09<T> kafkaConsumer = new > FlinkKafkaConsumer09<>(kafkaOptions.getKafkaTopic(), > new SerializationDeserializationSchema(typedSource, > ProtoCoder.of(typedSource)), p); > return UnboundedFlinkSource.of(kafkaConsumer); > } > > and the helper for serialization and DeSerialization is this. > > public class SerializationDeserializationSchema<T> > implements SerializationSchema<T>, DeserializationSchema<T> { > > private final Class<T> tClass; > > private final Coder<T> coder; > private transient ByteArrayOutputStream out; > > public SerializationDeserializationSchema(Class<T> clazz, Coder<T> coder) > { > this.tClass = clazz; > this.coder = coder; > this.out = new ByteArrayOutputStream(); > } > > @Override > public byte[] serialize(T element) { > > if (out == null) { > out = new ByteArrayOutputStream(); > } > try { > out.reset(); > coder.encode(element, out, Coder.Context.NESTED); > } catch (IOException e) { > throw new RuntimeException("encoding failed.", e); > } > return out.toByteArray(); > } > > @Override > public T deserialize(byte[] message) throws IOException { > return coder.decode(new ByteArrayInputStream(message), > Coder.Context.NESTED); > } > > @Override > public boolean isEndOfStream(T nextElement) { > return false; > } > > @Override > public TypeInformation<T> getProducedType() { > return TypeExtractor.getForClass(tClass); > } > } > > > I am getting the following, *Kyro* exception. > > java.lang.RuntimeException: ConsumerThread threw an exception: Could not > forward element to next operator > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to > next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473) > Caused by: com.esotericsoftware.kryo.KryoException: > java.lang.UnsupportedOperationException > Serialization trace: > records_ (com.model.Employee$Packet) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:168) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:349) > ... 3 more > Caused by: java.lang.UnsupportedOperationException > at > java.util.Collections$UnmodifiableCollection.add(Collections.java:1055) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > > > I am not able to avoid this "Kryo" Exception, Thanks for any help. > > Thanks > > Viswadeep. > >
