are suggesting about this org.apache.beam.runners.spark.io.KafkaIO; class? Which is part of spark?
Thanks Viswadeep Viswadeep Veguru. On Thu, Jun 2, 2016 at 2:32 PM, Aljoscha Krettek <[email protected]> wrote: > By the way, could you you try just using KafkaIO.Read. This is the the > official Beam Kafka source and it should work well with the Flink runner. > > On Thu, 2 Jun 2016 at 22:47 Viswadeep <[email protected]> wrote: > >> yes!, i have tried it works. >> -Viswadeep >> >> Viswadeep Veguru. >> >> >> On Thu, Jun 2, 2016 at 12:43 PM, Amit Sela <[email protected]> wrote: >> >>> I'm not too familiar with Flink but is there a way to simply read bytes >>> of Kafka and in the next step (map ?) to Deserialize the bytes that >>> represent this object and transform it into a "Kryo-compliant" object ? >>> This should avoid Kryo, right ? >>> >>> On Thu, Jun 2, 2016 at 10:31 PM Viswadeep <[email protected]> wrote: >>> >>>> Hi Max, >>>> >>>> I tried your approach (1) it did not work. I am still getting the same >>>> exception. >>>> (2) is not possible, unless protobuff complier changes. >>>> >>>> Yes setting that in config also did not work as well. >>>> >>>> I think some where inside flink it is not able to process correctly. >>>> >>>> >>>> Thanks >>>> Viswadeep >>>> >>>> >>>> Viswadeep Veguru. >>>> >>>> >>>> On Thu, Jun 2, 2016 at 10:09 AM, Maximilian Michels <[email protected]> >>>> wrote: >>>> >>>>> Hi Viswadeep, >>>>> >>>>> What Amit recommended (thanks!) is indeed an easy fix if you're using >>>>> Flink. However, we don't expose the ExecutionConfig in Beam. So >>>>> setting custom Kryo serializers is not possible. >>>>> >>>>> Two other options I see: >>>>> >>>>> 1) Could you try using the following in your deserialization schema? >>>>> >>>>> @Override >>>>> public TypeInformation<T> getProducedType() { >>>>> return new CoderTypeInformation<>(coder); >>>>> } >>>>> >>>>> 2) Could you avoid using Collections.UnmodifiableList in your code? >>>>> Not sure if it is possible because it seems to be a field in your >>>>> Protobuf class. >>>>> >>>>> Thanks, >>>>> Max >>>>> >>>>> On Thu, Jun 2, 2016 at 4:49 PM, Viswadeep <[email protected]> wrote: >>>>> > Thanks Amit, >>>>> > >>>>> > It did not solve my issue, Flink it self has the following comment in >>>>> > ExecutionConfig.java >>>>> > >>>>> > /** >>>>> > * Force TypeExtractor to use Kryo serializer for POJOS even though >>>>> we could >>>>> > analyze as POJO. >>>>> > * In some cases this might be preferable. For example, when using >>>>> > interfaces >>>>> > * with subclasses that cannot be analyzed as POJO. >>>>> > */ >>>>> > >>>>> > >>>>> > Because generated Protobuf class extends GeneratedMessage implements >>>>> > Employee.PacketOrBuilder >>>>> > >>>>> > >>>>> > Thanks for any help. >>>>> > Viswadeep >>>>> > >>>>> > >>>>> > On Wed, Jun 1, 2016 at 11:02 PM, Amit Sela <[email protected]> >>>>> wrote: >>>>> >> >>>>> >> I think it has to do with Flink's internal use of Kryo - take a >>>>> look at >>>>> >> this: >>>>> >> >>>>> http://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink >>>>> >> I'm sure Flink committers will soon reach out to correct me if I'm >>>>> missing >>>>> >> something.. >>>>> >> >>>>> >> On Thu, Jun 2, 2016 at 5:01 AM Viswadeep <[email protected]> >>>>> wrote: >>>>> >>> >>>>> >>> Hi >>>>> >>> >>>>> >>> I am using apache beam with flink and ProtoBuf for encoding and >>>>> decoding. >>>>> >>> >>>>> >>> The following is the method for the FlinkKafka Consumer. >>>>> >>> >>>>> >>> public UnboundedSource<T extends >>>>> Message,UnboundedSource.CheckpointMark> >>>>> >>> build() { >>>>> >>> Properties p = new Properties(); >>>>> >>> p.setProperty("zookeeper.connect", >>>>> kafkaOptions.getZookeeper()); >>>>> >>> p.setProperty("bootstrap.servers", kafkaOptions.getBroker()); >>>>> >>> p.setProperty("group.id", kafkaOptions.getGroup()); >>>>> >>> FlinkKafkaConsumer09<T> kafkaConsumer = new >>>>> >>> FlinkKafkaConsumer09<>(kafkaOptions.getKafkaTopic(), >>>>> >>> new SerializationDeserializationSchema(typedSource, >>>>> >>> ProtoCoder.of(typedSource)), p); >>>>> >>> return UnboundedFlinkSource.of(kafkaConsumer); >>>>> >>> } >>>>> >>> >>>>> >>> and the helper for serialization and DeSerialization is this. >>>>> >>> >>>>> >>> public class SerializationDeserializationSchema<T> >>>>> >>> implements SerializationSchema<T>, >>>>> DeserializationSchema<T> { >>>>> >>> >>>>> >>> private final Class<T> tClass; >>>>> >>> >>>>> >>> private final Coder<T> coder; >>>>> >>> private transient ByteArrayOutputStream out; >>>>> >>> >>>>> >>> public SerializationDeserializationSchema(Class<T> clazz, >>>>> Coder<T> >>>>> >>> coder) { >>>>> >>> this.tClass = clazz; >>>>> >>> this.coder = coder; >>>>> >>> this.out = new ByteArrayOutputStream(); >>>>> >>> } >>>>> >>> >>>>> >>> @Override >>>>> >>> public byte[] serialize(T element) { >>>>> >>> >>>>> >>> if (out == null) { >>>>> >>> out = new ByteArrayOutputStream(); >>>>> >>> } >>>>> >>> try { >>>>> >>> out.reset(); >>>>> >>> coder.encode(element, out, Coder.Context.NESTED); >>>>> >>> } catch (IOException e) { >>>>> >>> throw new RuntimeException("encoding failed.", e); >>>>> >>> } >>>>> >>> return out.toByteArray(); >>>>> >>> } >>>>> >>> >>>>> >>> @Override >>>>> >>> public T deserialize(byte[] message) throws IOException { >>>>> >>> return coder.decode(new ByteArrayInputStream(message), >>>>> >>> Coder.Context.NESTED); >>>>> >>> } >>>>> >>> >>>>> >>> @Override >>>>> >>> public boolean isEndOfStream(T nextElement) { >>>>> >>> return false; >>>>> >>> } >>>>> >>> >>>>> >>> @Override >>>>> >>> public TypeInformation<T> getProducedType() { >>>>> >>> return TypeExtractor.getForClass(tClass); >>>>> >>> } >>>>> >>> } >>>>> >>> >>>>> >>> >>>>> >>> I am getting the following, Kyro exception. >>>>> >>> >>>>> >>> java.lang.RuntimeException: ConsumerThread threw an exception: >>>>> Could not >>>>> >>> forward element to next operator >>>>> >>> at >>>>> >>> >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336) >>>>> >>> at >>>>> >>> >>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) >>>>> >>> at >>>>> >>> >>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) >>>>> >>> at >>>>> >>> >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) >>>>> >>> at >>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>> >>> at java.lang.Thread.run(Thread.java:745) >>>>> >>> Caused by: java.lang.RuntimeException: Could not forward >>>>> element to >>>>> >>> next operator >>>>> >>> at >>>>> >>> >>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) >>>>> >>> at >>>>> >>> >>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) >>>>> >>> at >>>>> >>> >>>>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318) >>>>> >>> at >>>>> >>> >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473) >>>>> >>> Caused by: com.esotericsoftware.kryo.KryoException: >>>>> >>> java.lang.UnsupportedOperationException >>>>> >>> Serialization trace: >>>>> >>> records_ (com.model.Employee$Packet) >>>>> >>> at >>>>> >>> >>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) >>>>> >>> at >>>>> >>> >>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >>>>> >>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) >>>>> >>> at >>>>> >>> >>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:168) >>>>> >>> at >>>>> >>> >>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:349) >>>>> >>> ... 3 more >>>>> >>> Caused by: java.lang.UnsupportedOperationException >>>>> >>> at >>>>> >>> >>>>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1055) >>>>> >>> at >>>>> >>> >>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) >>>>> >>> at >>>>> >>> >>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) >>>>> >>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >>>>> >>> at >>>>> >>> >>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >>>>> >>> >>>>> >>> >>>>> >>> I am not able to avoid this "Kryo" Exception, Thanks for any help. >>>>> >>> >>>>> >>> Thanks >>>>> >>> >>>>> >>> Viswadeep. >>>>> >>> >>>>> > >>>>> > >>>>> >>>> >>>> >>
