are suggesting about this  org.apache.beam.runners.spark.io.KafkaIO; class?
Which is part of spark?

Thanks
Viswadeep

Viswadeep Veguru.


On Thu, Jun 2, 2016 at 2:32 PM, Aljoscha Krettek <[email protected]>
wrote:

> By the way, could you you try just using KafkaIO.Read. This is the the
> official Beam Kafka source and it should work well with the Flink runner.
>
> On Thu, 2 Jun 2016 at 22:47 Viswadeep <[email protected]> wrote:
>
>> yes!, i have tried it works.
>> -Viswadeep
>>
>> Viswadeep Veguru.
>>
>>
>> On Thu, Jun 2, 2016 at 12:43 PM, Amit Sela <[email protected]> wrote:
>>
>>> I'm not too familiar with Flink but is there a way to simply read bytes
>>> of Kafka and in the next step (map ?) to Deserialize the bytes that
>>> represent this object and transform it into a "Kryo-compliant" object ?
>>> This should avoid Kryo, right ?
>>>
>>> On Thu, Jun 2, 2016 at 10:31 PM Viswadeep <[email protected]> wrote:
>>>
>>>> Hi Max,
>>>>
>>>> I tried your approach (1) it did not work. I am still getting the same
>>>> exception.
>>>> (2) is not possible, unless protobuff complier changes.
>>>>
>>>> Yes setting that in config also did not work as well.
>>>>
>>>> I think some where inside flink it is not able to process correctly.
>>>>
>>>>
>>>> Thanks
>>>> Viswadeep
>>>>
>>>>
>>>> Viswadeep Veguru.
>>>>
>>>>
>>>> On Thu, Jun 2, 2016 at 10:09 AM, Maximilian Michels <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Viswadeep,
>>>>>
>>>>> What Amit recommended (thanks!) is indeed an easy fix if you're using
>>>>> Flink. However, we don't expose the ExecutionConfig in Beam. So
>>>>> setting custom Kryo serializers is not possible.
>>>>>
>>>>> Two other options I see:
>>>>>
>>>>> 1) Could you try using the following in your deserialization schema?
>>>>>
>>>>>    @Override
>>>>>     public TypeInformation<T> getProducedType() {
>>>>>         return new CoderTypeInformation<>(coder);
>>>>>     }
>>>>>
>>>>> 2) Could you avoid using Collections.UnmodifiableList in your code?
>>>>> Not sure if it is possible because it seems to be a field in your
>>>>> Protobuf class.
>>>>>
>>>>> Thanks,
>>>>> Max
>>>>>
>>>>> On Thu, Jun 2, 2016 at 4:49 PM, Viswadeep <[email protected]> wrote:
>>>>> > Thanks Amit,
>>>>> >
>>>>> > It did not solve my issue, Flink it self has the following comment in
>>>>> > ExecutionConfig.java
>>>>> >
>>>>> > /**
>>>>> >  * Force TypeExtractor to use Kryo serializer for POJOS even though
>>>>> we could
>>>>> > analyze as POJO.
>>>>> >  * In some cases this might be preferable. For example, when using
>>>>> > interfaces
>>>>> >  * with subclasses that cannot be analyzed as POJO.
>>>>> >  */
>>>>> >
>>>>> >
>>>>> > Because generated Protobuf class extends GeneratedMessage implements
>>>>> > Employee.PacketOrBuilder
>>>>> >
>>>>> >
>>>>> > Thanks for any help.
>>>>> > Viswadeep
>>>>> >
>>>>> >
>>>>> > On Wed, Jun 1, 2016 at 11:02 PM, Amit Sela <[email protected]>
>>>>> wrote:
>>>>> >>
>>>>> >> I think it has to do with Flink's internal use of Kryo  - take a
>>>>> look at
>>>>> >> this:
>>>>> >>
>>>>> http://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink
>>>>> >> I'm sure Flink committers will soon reach out to correct me if I'm
>>>>> missing
>>>>> >> something..
>>>>> >>
>>>>> >> On Thu, Jun 2, 2016 at 5:01 AM Viswadeep <[email protected]>
>>>>> wrote:
>>>>> >>>
>>>>> >>> Hi
>>>>> >>>
>>>>> >>> I am using apache beam with flink and ProtoBuf for encoding and
>>>>> decoding.
>>>>> >>>
>>>>> >>> The following is the method for the FlinkKafka Consumer.
>>>>> >>>
>>>>> >>> public UnboundedSource<T extends
>>>>> Message,UnboundedSource.CheckpointMark>
>>>>> >>> build() {
>>>>> >>>     Properties p = new Properties();
>>>>> >>>     p.setProperty("zookeeper.connect",
>>>>> kafkaOptions.getZookeeper());
>>>>> >>>     p.setProperty("bootstrap.servers", kafkaOptions.getBroker());
>>>>> >>>     p.setProperty("group.id", kafkaOptions.getGroup());
>>>>> >>>     FlinkKafkaConsumer09<T> kafkaConsumer = new
>>>>> >>> FlinkKafkaConsumer09<>(kafkaOptions.getKafkaTopic(),
>>>>> >>>             new SerializationDeserializationSchema(typedSource,
>>>>> >>> ProtoCoder.of(typedSource)), p);
>>>>> >>>     return UnboundedFlinkSource.of(kafkaConsumer);
>>>>> >>> }
>>>>> >>>
>>>>> >>> and the helper for serialization and DeSerialization is this.
>>>>> >>>
>>>>> >>> public class SerializationDeserializationSchema<T>
>>>>> >>>         implements SerializationSchema<T>,
>>>>> DeserializationSchema<T> {
>>>>> >>>
>>>>> >>>     private final Class<T> tClass;
>>>>> >>>
>>>>> >>>     private final Coder<T> coder;
>>>>> >>>     private transient ByteArrayOutputStream out;
>>>>> >>>
>>>>> >>>     public SerializationDeserializationSchema(Class<T> clazz,
>>>>> Coder<T>
>>>>> >>> coder) {
>>>>> >>>         this.tClass = clazz;
>>>>> >>>         this.coder = coder;
>>>>> >>>         this.out = new ByteArrayOutputStream();
>>>>> >>>     }
>>>>> >>>
>>>>> >>>     @Override
>>>>> >>>     public byte[] serialize(T element) {
>>>>> >>>
>>>>> >>>         if (out == null) {
>>>>> >>>             out = new ByteArrayOutputStream();
>>>>> >>>         }
>>>>> >>>         try {
>>>>> >>>             out.reset();
>>>>> >>>             coder.encode(element, out, Coder.Context.NESTED);
>>>>> >>>         } catch (IOException e) {
>>>>> >>>             throw new RuntimeException("encoding failed.", e);
>>>>> >>>         }
>>>>> >>>         return out.toByteArray();
>>>>> >>>     }
>>>>> >>>
>>>>> >>>     @Override
>>>>> >>>     public T deserialize(byte[] message) throws IOException {
>>>>> >>>         return coder.decode(new ByteArrayInputStream(message),
>>>>> >>> Coder.Context.NESTED);
>>>>> >>>     }
>>>>> >>>
>>>>> >>>     @Override
>>>>> >>>     public boolean isEndOfStream(T nextElement) {
>>>>> >>>         return false;
>>>>> >>>     }
>>>>> >>>
>>>>> >>>     @Override
>>>>> >>>     public TypeInformation<T> getProducedType() {
>>>>> >>>         return TypeExtractor.getForClass(tClass);
>>>>> >>>     }
>>>>> >>> }
>>>>> >>>
>>>>> >>>
>>>>> >>> I am getting the following, Kyro exception.
>>>>> >>>
>>>>> >>> java.lang.RuntimeException: ConsumerThread threw an exception:
>>>>> Could not
>>>>> >>> forward element to next operator
>>>>> >>>         at
>>>>> >>>
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336)
>>>>> >>>         at
>>>>> >>>
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>>>>> >>>         at
>>>>> >>>
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>>>>> >>>         at
>>>>> >>>
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>>>>> >>>         at
>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>> >>>         at java.lang.Thread.run(Thread.java:745)
>>>>> >>>     Caused by: java.lang.RuntimeException: Could not forward
>>>>> element to
>>>>> >>> next operator
>>>>> >>>         at
>>>>> >>>
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>>>>> >>>         at
>>>>> >>>
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>>>>> >>>         at
>>>>> >>>
>>>>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
>>>>> >>>         at
>>>>> >>>
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
>>>>> >>>     Caused by: com.esotericsoftware.kryo.KryoException:
>>>>> >>> java.lang.UnsupportedOperationException
>>>>> >>>     Serialization trace:
>>>>> >>>     records_ (com.model.Employee$Packet)
>>>>> >>>         at
>>>>> >>>
>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>> >>>         at
>>>>> >>>
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>> >>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>>>>> >>>         at
>>>>> >>>
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:168)
>>>>> >>>         at
>>>>> >>>
>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:349)
>>>>> >>>         ... 3 more
>>>>> >>>     Caused by: java.lang.UnsupportedOperationException
>>>>> >>>         at
>>>>> >>>
>>>>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
>>>>> >>>         at
>>>>> >>>
>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>>>>> >>>         at
>>>>> >>>
>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>>>> >>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>>> >>>         at
>>>>> >>>
>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>>> >>>
>>>>> >>>
>>>>> >>> I am not able to avoid this "Kryo" Exception, Thanks for any help.
>>>>> >>>
>>>>> >>> Thanks
>>>>> >>>
>>>>> >>> Viswadeep.
>>>>> >>>
>>>>> >
>>>>> >
>>>>>
>>>>
>>>>
>>

Reply via email to