Thanks Amit, It did not solve my issue, Flink it self has the following comment in ExecutionConfig.java
/** * Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO. * In some cases this might be preferable. For example, when using interfaces * with subclasses that cannot be analyzed as POJO. */ Because generated Protobuf class extends GeneratedMessage implements Employee.PacketOrBuilder Thanks for any help. Viswadeep On Wed, Jun 1, 2016 at 11:02 PM, Amit Sela <[email protected]> wrote: > I think it has to do with Flink's internal use of Kryo - take a look at > this: > http://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink > I'm sure Flink committers will soon reach out to correct me if I'm missing > something.. > > On Thu, Jun 2, 2016 at 5:01 AM Viswadeep <[email protected]> wrote: > >> Hi >> >> I am using apache beam with flink and ProtoBuf for encoding and decoding. >> >> The following is the method for the FlinkKafka Consumer. >> >> public UnboundedSource<T extends Message,UnboundedSource.CheckpointMark> >> build() { >> Properties p = new Properties(); >> p.setProperty("zookeeper.connect", kafkaOptions.getZookeeper()); >> p.setProperty("bootstrap.servers", kafkaOptions.getBroker()); >> p.setProperty("group.id", kafkaOptions.getGroup()); >> FlinkKafkaConsumer09<T> kafkaConsumer = new >> FlinkKafkaConsumer09<>(kafkaOptions.getKafkaTopic(), >> new SerializationDeserializationSchema(typedSource, >> ProtoCoder.of(typedSource)), p); >> return UnboundedFlinkSource.of(kafkaConsumer); >> } >> >> and the helper for serialization and DeSerialization is this. >> >> public class SerializationDeserializationSchema<T> >> implements SerializationSchema<T>, DeserializationSchema<T> { >> >> private final Class<T> tClass; >> >> private final Coder<T> coder; >> private transient ByteArrayOutputStream out; >> >> public SerializationDeserializationSchema(Class<T> clazz, Coder<T> >> coder) { >> this.tClass = clazz; >> this.coder = coder; >> this.out = new ByteArrayOutputStream(); >> } >> >> @Override >> public byte[] serialize(T element) { >> >> if (out == null) { >> out = new ByteArrayOutputStream(); >> } >> try { >> out.reset(); >> coder.encode(element, out, Coder.Context.NESTED); >> } catch (IOException e) { >> throw new RuntimeException("encoding failed.", e); >> } >> return out.toByteArray(); >> } >> >> @Override >> public T deserialize(byte[] message) throws IOException { >> return coder.decode(new ByteArrayInputStream(message), >> Coder.Context.NESTED); >> } >> >> @Override >> public boolean isEndOfStream(T nextElement) { >> return false; >> } >> >> @Override >> public TypeInformation<T> getProducedType() { >> return TypeExtractor.getForClass(tClass); >> } >> } >> >> >> I am getting the following, *Kyro* exception. >> >> java.lang.RuntimeException: ConsumerThread threw an exception: Could not >> forward element to next operator >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.RuntimeException: Could not forward element to >> next operator >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) >> at >> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473) >> Caused by: com.esotericsoftware.kryo.KryoException: >> java.lang.UnsupportedOperationException >> Serialization trace: >> records_ (com.model.Employee$Packet) >> at >> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:168) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:349) >> ... 3 more >> Caused by: java.lang.UnsupportedOperationException >> at >> java.util.Collections$UnmodifiableCollection.add(Collections.java:1055) >> at >> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) >> at >> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >> at >> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >> >> >> I am not able to avoid this "Kryo" Exception, Thanks for any help. >> >> Thanks >> >> Viswadeep. >> >> >
