yes!, i have tried it works. -Viswadeep Viswadeep Veguru.
On Thu, Jun 2, 2016 at 12:43 PM, Amit Sela <[email protected]> wrote: > I'm not too familiar with Flink but is there a way to simply read bytes of > Kafka and in the next step (map ?) to Deserialize the bytes that represent > this object and transform it into a "Kryo-compliant" object ? This should > avoid Kryo, right ? > > On Thu, Jun 2, 2016 at 10:31 PM Viswadeep <[email protected]> wrote: > >> Hi Max, >> >> I tried your approach (1) it did not work. I am still getting the same >> exception. >> (2) is not possible, unless protobuff complier changes. >> >> Yes setting that in config also did not work as well. >> >> I think some where inside flink it is not able to process correctly. >> >> >> Thanks >> Viswadeep >> >> >> Viswadeep Veguru. >> >> >> On Thu, Jun 2, 2016 at 10:09 AM, Maximilian Michels <[email protected]> >> wrote: >> >>> Hi Viswadeep, >>> >>> What Amit recommended (thanks!) is indeed an easy fix if you're using >>> Flink. However, we don't expose the ExecutionConfig in Beam. So >>> setting custom Kryo serializers is not possible. >>> >>> Two other options I see: >>> >>> 1) Could you try using the following in your deserialization schema? >>> >>> @Override >>> public TypeInformation<T> getProducedType() { >>> return new CoderTypeInformation<>(coder); >>> } >>> >>> 2) Could you avoid using Collections.UnmodifiableList in your code? >>> Not sure if it is possible because it seems to be a field in your >>> Protobuf class. >>> >>> Thanks, >>> Max >>> >>> On Thu, Jun 2, 2016 at 4:49 PM, Viswadeep <[email protected]> wrote: >>> > Thanks Amit, >>> > >>> > It did not solve my issue, Flink it self has the following comment in >>> > ExecutionConfig.java >>> > >>> > /** >>> > * Force TypeExtractor to use Kryo serializer for POJOS even though we >>> could >>> > analyze as POJO. >>> > * In some cases this might be preferable. For example, when using >>> > interfaces >>> > * with subclasses that cannot be analyzed as POJO. >>> > */ >>> > >>> > >>> > Because generated Protobuf class extends GeneratedMessage implements >>> > Employee.PacketOrBuilder >>> > >>> > >>> > Thanks for any help. >>> > Viswadeep >>> > >>> > >>> > On Wed, Jun 1, 2016 at 11:02 PM, Amit Sela <[email protected]> >>> wrote: >>> >> >>> >> I think it has to do with Flink's internal use of Kryo - take a look >>> at >>> >> this: >>> >> >>> http://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink >>> >> I'm sure Flink committers will soon reach out to correct me if I'm >>> missing >>> >> something.. >>> >> >>> >> On Thu, Jun 2, 2016 at 5:01 AM Viswadeep <[email protected]> wrote: >>> >>> >>> >>> Hi >>> >>> >>> >>> I am using apache beam with flink and ProtoBuf for encoding and >>> decoding. >>> >>> >>> >>> The following is the method for the FlinkKafka Consumer. >>> >>> >>> >>> public UnboundedSource<T extends >>> Message,UnboundedSource.CheckpointMark> >>> >>> build() { >>> >>> Properties p = new Properties(); >>> >>> p.setProperty("zookeeper.connect", kafkaOptions.getZookeeper()); >>> >>> p.setProperty("bootstrap.servers", kafkaOptions.getBroker()); >>> >>> p.setProperty("group.id", kafkaOptions.getGroup()); >>> >>> FlinkKafkaConsumer09<T> kafkaConsumer = new >>> >>> FlinkKafkaConsumer09<>(kafkaOptions.getKafkaTopic(), >>> >>> new SerializationDeserializationSchema(typedSource, >>> >>> ProtoCoder.of(typedSource)), p); >>> >>> return UnboundedFlinkSource.of(kafkaConsumer); >>> >>> } >>> >>> >>> >>> and the helper for serialization and DeSerialization is this. >>> >>> >>> >>> public class SerializationDeserializationSchema<T> >>> >>> implements SerializationSchema<T>, DeserializationSchema<T> { >>> >>> >>> >>> private final Class<T> tClass; >>> >>> >>> >>> private final Coder<T> coder; >>> >>> private transient ByteArrayOutputStream out; >>> >>> >>> >>> public SerializationDeserializationSchema(Class<T> clazz, >>> Coder<T> >>> >>> coder) { >>> >>> this.tClass = clazz; >>> >>> this.coder = coder; >>> >>> this.out = new ByteArrayOutputStream(); >>> >>> } >>> >>> >>> >>> @Override >>> >>> public byte[] serialize(T element) { >>> >>> >>> >>> if (out == null) { >>> >>> out = new ByteArrayOutputStream(); >>> >>> } >>> >>> try { >>> >>> out.reset(); >>> >>> coder.encode(element, out, Coder.Context.NESTED); >>> >>> } catch (IOException e) { >>> >>> throw new RuntimeException("encoding failed.", e); >>> >>> } >>> >>> return out.toByteArray(); >>> >>> } >>> >>> >>> >>> @Override >>> >>> public T deserialize(byte[] message) throws IOException { >>> >>> return coder.decode(new ByteArrayInputStream(message), >>> >>> Coder.Context.NESTED); >>> >>> } >>> >>> >>> >>> @Override >>> >>> public boolean isEndOfStream(T nextElement) { >>> >>> return false; >>> >>> } >>> >>> >>> >>> @Override >>> >>> public TypeInformation<T> getProducedType() { >>> >>> return TypeExtractor.getForClass(tClass); >>> >>> } >>> >>> } >>> >>> >>> >>> >>> >>> I am getting the following, Kyro exception. >>> >>> >>> >>> java.lang.RuntimeException: ConsumerThread threw an exception: Could >>> not >>> >>> forward element to next operator >>> >>> at >>> >>> >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336) >>> >>> at >>> >>> >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) >>> >>> at >>> >>> >>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) >>> >>> at >>> >>> >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) >>> >>> at >>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>> >>> at java.lang.Thread.run(Thread.java:745) >>> >>> Caused by: java.lang.RuntimeException: Could not forward element >>> to >>> >>> next operator >>> >>> at >>> >>> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) >>> >>> at >>> >>> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) >>> >>> at >>> >>> >>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318) >>> >>> at >>> >>> >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473) >>> >>> Caused by: com.esotericsoftware.kryo.KryoException: >>> >>> java.lang.UnsupportedOperationException >>> >>> Serialization trace: >>> >>> records_ (com.model.Employee$Packet) >>> >>> at >>> >>> >>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) >>> >>> at >>> >>> >>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >>> >>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) >>> >>> at >>> >>> >>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:168) >>> >>> at >>> >>> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:349) >>> >>> ... 3 more >>> >>> Caused by: java.lang.UnsupportedOperationException >>> >>> at >>> >>> >>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1055) >>> >>> at >>> >>> >>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) >>> >>> at >>> >>> >>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) >>> >>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >>> >>> at >>> >>> >>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >>> >>> >>> >>> >>> >>> I am not able to avoid this "Kryo" Exception, Thanks for any help. >>> >>> >>> >>> Thanks >>> >>> >>> >>> Viswadeep. >>> >>> >>> > >>> > >>> >> >>
