Hi Viswadeep,
What Amit recommended (thanks!) is indeed an easy fix if you're using
Flink. However, we don't expose the ExecutionConfig in Beam. So
setting custom Kryo serializers is not possible.
Two other options I see:
1) Could you try using the following in your deserialization schema?
@Override
public TypeInformation<T> getProducedType() {
return new CoderTypeInformation<>(coder);
}
2) Could you avoid using Collections.UnmodifiableList in your code?
Not sure if it is possible because it seems to be a field in your
Protobuf class.
Thanks,
Max
On Thu, Jun 2, 2016 at 4:49 PM, Viswadeep <[email protected]> wrote:
> Thanks Amit,
>
> It did not solve my issue, Flink it self has the following comment in
> ExecutionConfig.java
>
> /**
> * Force TypeExtractor to use Kryo serializer for POJOS even though we could
> analyze as POJO.
> * In some cases this might be preferable. For example, when using
> interfaces
> * with subclasses that cannot be analyzed as POJO.
> */
>
>
> Because generated Protobuf class extends GeneratedMessage implements
> Employee.PacketOrBuilder
>
>
> Thanks for any help.
> Viswadeep
>
>
> On Wed, Jun 1, 2016 at 11:02 PM, Amit Sela <[email protected]> wrote:
>>
>> I think it has to do with Flink's internal use of Kryo - take a look at
>> this:
>> http://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink
>> I'm sure Flink committers will soon reach out to correct me if I'm missing
>> something..
>>
>> On Thu, Jun 2, 2016 at 5:01 AM Viswadeep <[email protected]> wrote:
>>>
>>> Hi
>>>
>>> I am using apache beam with flink and ProtoBuf for encoding and decoding.
>>>
>>> The following is the method for the FlinkKafka Consumer.
>>>
>>> public UnboundedSource<T extends Message,UnboundedSource.CheckpointMark>
>>> build() {
>>> Properties p = new Properties();
>>> p.setProperty("zookeeper.connect", kafkaOptions.getZookeeper());
>>> p.setProperty("bootstrap.servers", kafkaOptions.getBroker());
>>> p.setProperty("group.id", kafkaOptions.getGroup());
>>> FlinkKafkaConsumer09<T> kafkaConsumer = new
>>> FlinkKafkaConsumer09<>(kafkaOptions.getKafkaTopic(),
>>> new SerializationDeserializationSchema(typedSource,
>>> ProtoCoder.of(typedSource)), p);
>>> return UnboundedFlinkSource.of(kafkaConsumer);
>>> }
>>>
>>> and the helper for serialization and DeSerialization is this.
>>>
>>> public class SerializationDeserializationSchema<T>
>>> implements SerializationSchema<T>, DeserializationSchema<T> {
>>>
>>> private final Class<T> tClass;
>>>
>>> private final Coder<T> coder;
>>> private transient ByteArrayOutputStream out;
>>>
>>> public SerializationDeserializationSchema(Class<T> clazz, Coder<T>
>>> coder) {
>>> this.tClass = clazz;
>>> this.coder = coder;
>>> this.out = new ByteArrayOutputStream();
>>> }
>>>
>>> @Override
>>> public byte[] serialize(T element) {
>>>
>>> if (out == null) {
>>> out = new ByteArrayOutputStream();
>>> }
>>> try {
>>> out.reset();
>>> coder.encode(element, out, Coder.Context.NESTED);
>>> } catch (IOException e) {
>>> throw new RuntimeException("encoding failed.", e);
>>> }
>>> return out.toByteArray();
>>> }
>>>
>>> @Override
>>> public T deserialize(byte[] message) throws IOException {
>>> return coder.decode(new ByteArrayInputStream(message),
>>> Coder.Context.NESTED);
>>> }
>>>
>>> @Override
>>> public boolean isEndOfStream(T nextElement) {
>>> return false;
>>> }
>>>
>>> @Override
>>> public TypeInformation<T> getProducedType() {
>>> return TypeExtractor.getForClass(tClass);
>>> }
>>> }
>>>
>>>
>>> I am getting the following, Kyro exception.
>>>
>>> java.lang.RuntimeException: ConsumerThread threw an exception: Could not
>>> forward element to next operator
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.RuntimeException: Could not forward element to
>>> next operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
>>> Caused by: com.esotericsoftware.kryo.KryoException:
>>> java.lang.UnsupportedOperationException
>>> Serialization trace:
>>> records_ (com.model.Employee$Packet)
>>> at
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:168)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:349)
>>> ... 3 more
>>> Caused by: java.lang.UnsupportedOperationException
>>> at
>>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
>>> at
>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>>> at
>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>> at
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>
>>>
>>> I am not able to avoid this "Kryo" Exception, Thanks for any help.
>>>
>>> Thanks
>>>
>>> Viswadeep.
>>>
>
>