Hi,

Aljosha meants the Beam "native" KafkaIO (org.apache.beam.sdk.io.kafka.KafkaIO) provided by the following artifact:

groupId: org.apache.beam.io
artifactId: kafka

Regards
JB

On 06/03/2016 01:03 AM, Viswadeep wrote:
are suggesting about this org.apache.beam.runners.spark.io.KafkaIO; class?
Which is part of spark?

Thanks
Viswadeep

Viswadeep Veguru.

On Thu, Jun 2, 2016 at 2:32 PM, Aljoscha Krettek <[email protected]
<mailto:[email protected]>> wrote:

    By the way, could you you try just using KafkaIO.Read. This is the
    the official Beam Kafka source and it should work well with the
    Flink runner.

    On Thu, 2 Jun 2016 at 22:47 Viswadeep <[email protected]
    <mailto:[email protected]>> wrote:

        yes!, i have tried it works.
        -Viswadeep

        Viswadeep Veguru.

        On Thu, Jun 2, 2016 at 12:43 PM, Amit Sela <[email protected]
        <mailto:[email protected]>> wrote:

            I'm not too familiar with Flink but is there a way to simply
            read bytes of Kafka and in the next step (map ?) to
            Deserialize the bytes that represent this object and
            transform it into a "Kryo-compliant" object ? This should
            avoid Kryo, right ?

            On Thu, Jun 2, 2016 at 10:31 PM Viswadeep
            <[email protected] <mailto:[email protected]>> wrote:

                Hi Max,

                I tried your approach (1) it did not work. I am still
                getting the same exception.
                (2) is not possible, unless protobuff complier changes.

                Yes setting that in config also did not work as well.

                I think some where inside flink it is not able to
                process correctly.


                Thanks
                Viswadeep


                Viswadeep Veguru.

                On Thu, Jun 2, 2016 at 10:09 AM, Maximilian Michels
                <[email protected] <mailto:[email protected]>> wrote:

                    Hi Viswadeep,

                    What Amit recommended (thanks!) is indeed an easy
                    fix if you're using
                    Flink. However, we don't expose the ExecutionConfig
                    in Beam. So
                    setting custom Kryo serializers is not possible.

                    Two other options I see:

                    1) Could you try using the following in your
                    deserialization schema?

                        @Override
                         public TypeInformation<T> getProducedType() {
                             return new CoderTypeInformation<>(coder);
                         }

                    2) Could you avoid using
                    Collections.UnmodifiableList in your code?
                    Not sure if it is possible because it seems to be a
                    field in your
                    Protobuf class.

                    Thanks,
                    Max

                    On Thu, Jun 2, 2016 at 4:49 PM, Viswadeep
                    <[email protected] <mailto:[email protected]>>
                    wrote:
                     > Thanks Amit,
                     >
                     > It did not solve my issue, Flink it self has the
                    following comment in
                     > ExecutionConfig.java
                     >
                     > /**
                     >  * Force TypeExtractor to use Kryo serializer for
                    POJOS even though we could
                     > analyze as POJO.
                     >  * In some cases this might be preferable. For
                    example, when using
                     > interfaces
                     >  * with subclasses that cannot be analyzed as POJO.
                     >  */
                     >
                     >
                     > Because generated Protobuf class extends
                    GeneratedMessage implements
                     > Employee.PacketOrBuilder
                     >
                     >
                     > Thanks for any help.
                     > Viswadeep
                     >
                     >
                     > On Wed, Jun 1, 2016 at 11:02 PM, Amit Sela
                    <[email protected] <mailto:[email protected]>>
                    wrote:
                     >>
                     >> I think it has to do with Flink's internal use
                    of Kryo  - take a look at
                     >> this:
                     >>
                    
http://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink
                     >> I'm sure Flink committers will soon reach out to
                    correct me if I'm missing
                     >> something..
                     >>
                     >> On Thu, Jun 2, 2016 at 5:01 AM Viswadeep
                    <[email protected] <mailto:[email protected]>>
                    wrote:
                     >>>
                     >>> Hi
                     >>>
                     >>> I am using apache beam with flink and ProtoBuf
                    for encoding and decoding.
                     >>>
                     >>> The following is the method for the FlinkKafka
                    Consumer.
                     >>>
                     >>> public UnboundedSource<T extends
                    Message,UnboundedSource.CheckpointMark>
                     >>> build() {
                     >>>     Properties p = new Properties();
                     >>>     p.setProperty("zookeeper.connect",
                    kafkaOptions.getZookeeper());
                     >>>     p.setProperty("bootstrap.servers",
                    kafkaOptions.getBroker());
                     >>>     p.setProperty("group.id <http://group.id>",
                    kafkaOptions.getGroup());
                     >>>     FlinkKafkaConsumer09<T> kafkaConsumer = new
                     >>>
                    FlinkKafkaConsumer09<>(kafkaOptions.getKafkaTopic(),
                     >>>             new
                    SerializationDeserializationSchema(typedSource,
                     >>> ProtoCoder.of(typedSource)), p);
                     >>>     return UnboundedFlinkSource.of(kafkaConsumer);
                     >>> }
                     >>>
                     >>> and the helper for serialization and
                    DeSerialization is this.
                     >>>
                     >>> public class SerializationDeserializationSchema<T>
                     >>>         implements SerializationSchema<T>,
                    DeserializationSchema<T> {
                     >>>
                     >>>     private final Class<T> tClass;
                     >>>
                     >>>     private final Coder<T> coder;
                     >>>     private transient ByteArrayOutputStream out;
                     >>>
                     >>>     public
                    SerializationDeserializationSchema(Class<T> clazz,
                    Coder<T>
                     >>> coder) {
                     >>>         this.tClass = clazz;
                     >>>         this.coder = coder;
                     >>>         this.out = new ByteArrayOutputStream();
                     >>>     }
                     >>>
                     >>>     @Override
                     >>>     public byte[] serialize(T element) {
                     >>>
                     >>>         if (out == null) {
                     >>>             out = new ByteArrayOutputStream();
                     >>>         }
                     >>>         try {
                     >>>             out.reset();
                     >>>             coder.encode(element, out,
                    Coder.Context.NESTED);
                     >>>         } catch (IOException e) {
                     >>>             throw new
                    RuntimeException("encoding failed.", e);
                     >>>         }
                     >>>         return out.toByteArray();
                     >>>     }
                     >>>
                     >>>     @Override
                     >>>     public T deserialize(byte[] message) throws
                    IOException {
                     >>>         return coder.decode(new
                    ByteArrayInputStream(message),
                     >>> Coder.Context.NESTED);
                     >>>     }
                     >>>
                     >>>     @Override
                     >>>     public boolean isEndOfStream(T nextElement) {
                     >>>         return false;
                     >>>     }
                     >>>
                     >>>     @Override
                     >>>     public TypeInformation<T> getProducedType() {
                     >>>         return TypeExtractor.getForClass(tClass);
                     >>>     }
                     >>> }
                     >>>
                     >>>
                     >>> I am getting the following, Kyro exception.
                     >>>
                     >>> java.lang.RuntimeException: ConsumerThread
                    threw an exception: Could not
                     >>> forward element to next operator
                     >>>         at
                     >>>
                    
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336)
                     >>>         at
                     >>>
                    
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
                     >>>         at
                     >>>
                    
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
                     >>>         at
                     >>>
                    
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
                     >>>         at
                    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
                     >>>         at java.lang.Thread.run(Thread.java:745)
                     >>>     Caused by: java.lang.RuntimeException:
                    Could not forward element to
                     >>> next operator
                     >>>         at
                     >>>
                    
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
                     >>>         at
                     >>>
                    
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
                     >>>         at
                     >>>
                    
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
                     >>>         at
                     >>>
                    
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
                     >>>     Caused by:
                    com.esotericsoftware.kryo.KryoException:
                     >>> java.lang.UnsupportedOperationException
                     >>>     Serialization trace:
                     >>>     records_ (com.model.Employee$Packet)
                     >>>         at
                     >>>
                    
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
                     >>>         at
                     >>>
                    
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
                     >>>         at
                    com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
                     >>>         at
                     >>>
                    
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:168)
                     >>>         at
                     >>>
                    
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:349)
                     >>>         ... 3 more
                     >>>     Caused by:
                    java.lang.UnsupportedOperationException
                     >>>         at
                     >>>
                    
java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
                     >>>         at
                     >>>
                    
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
                     >>>         at
                     >>>
                    
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
                     >>>         at
                    com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
                     >>>         at
                     >>>
                    
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
                     >>>
                     >>>
                     >>> I am not able to avoid this "Kryo" Exception,
                    Thanks for any help.
                     >>>
                     >>> Thanks
                     >>>
                     >>> Viswadeep.
                     >>>
                     >
                     >





--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to