org.apache.beam.sdk.io.kafka.KafkaIO

-Aljoscha

On Fri, 3 Jun 2016 at 01:03 Viswadeep <[email protected]> wrote:

> are suggesting about this  org.apache.beam.runners.spark.io.KafkaIO;
> class?
> Which is part of spark?
>
> Thanks
> Viswadeep
>
> Viswadeep Veguru.
>
>
> On Thu, Jun 2, 2016 at 2:32 PM, Aljoscha Krettek <[email protected]>
> wrote:
>
>> By the way, could you you try just using KafkaIO.Read. This is the the
>> official Beam Kafka source and it should work well with the Flink runner.
>>
>> On Thu, 2 Jun 2016 at 22:47 Viswadeep <[email protected]> wrote:
>>
>>> yes!, i have tried it works.
>>> -Viswadeep
>>>
>>> Viswadeep Veguru.
>>>
>>>
>>> On Thu, Jun 2, 2016 at 12:43 PM, Amit Sela <[email protected]> wrote:
>>>
>>>> I'm not too familiar with Flink but is there a way to simply read bytes
>>>> of Kafka and in the next step (map ?) to Deserialize the bytes that
>>>> represent this object and transform it into a "Kryo-compliant" object ?
>>>> This should avoid Kryo, right ?
>>>>
>>>> On Thu, Jun 2, 2016 at 10:31 PM Viswadeep <[email protected]> wrote:
>>>>
>>>>> Hi Max,
>>>>>
>>>>> I tried your approach (1) it did not work. I am still getting the same
>>>>> exception.
>>>>> (2) is not possible, unless protobuff complier changes.
>>>>>
>>>>> Yes setting that in config also did not work as well.
>>>>>
>>>>> I think some where inside flink it is not able to process correctly.
>>>>>
>>>>>
>>>>> Thanks
>>>>> Viswadeep
>>>>>
>>>>>
>>>>> Viswadeep Veguru.
>>>>>
>>>>>
>>>>> On Thu, Jun 2, 2016 at 10:09 AM, Maximilian Michels <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Viswadeep,
>>>>>>
>>>>>> What Amit recommended (thanks!) is indeed an easy fix if you're using
>>>>>> Flink. However, we don't expose the ExecutionConfig in Beam. So
>>>>>> setting custom Kryo serializers is not possible.
>>>>>>
>>>>>> Two other options I see:
>>>>>>
>>>>>> 1) Could you try using the following in your deserialization schema?
>>>>>>
>>>>>>    @Override
>>>>>>     public TypeInformation<T> getProducedType() {
>>>>>>         return new CoderTypeInformation<>(coder);
>>>>>>     }
>>>>>>
>>>>>> 2) Could you avoid using Collections.UnmodifiableList in your code?
>>>>>> Not sure if it is possible because it seems to be a field in your
>>>>>> Protobuf class.
>>>>>>
>>>>>> Thanks,
>>>>>> Max
>>>>>>
>>>>>> On Thu, Jun 2, 2016 at 4:49 PM, Viswadeep <[email protected]>
>>>>>> wrote:
>>>>>> > Thanks Amit,
>>>>>> >
>>>>>> > It did not solve my issue, Flink it self has the following comment
>>>>>> in
>>>>>> > ExecutionConfig.java
>>>>>> >
>>>>>> > /**
>>>>>> >  * Force TypeExtractor to use Kryo serializer for POJOS even though
>>>>>> we could
>>>>>> > analyze as POJO.
>>>>>> >  * In some cases this might be preferable. For example, when using
>>>>>> > interfaces
>>>>>> >  * with subclasses that cannot be analyzed as POJO.
>>>>>> >  */
>>>>>> >
>>>>>> >
>>>>>> > Because generated Protobuf class extends GeneratedMessage implements
>>>>>> > Employee.PacketOrBuilder
>>>>>> >
>>>>>> >
>>>>>> > Thanks for any help.
>>>>>> > Viswadeep
>>>>>> >
>>>>>> >
>>>>>> > On Wed, Jun 1, 2016 at 11:02 PM, Amit Sela <[email protected]>
>>>>>> wrote:
>>>>>> >>
>>>>>> >> I think it has to do with Flink's internal use of Kryo  - take a
>>>>>> look at
>>>>>> >> this:
>>>>>> >>
>>>>>> http://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink
>>>>>> >> I'm sure Flink committers will soon reach out to correct me if I'm
>>>>>> missing
>>>>>> >> something..
>>>>>> >>
>>>>>> >> On Thu, Jun 2, 2016 at 5:01 AM Viswadeep <[email protected]>
>>>>>> wrote:
>>>>>> >>>
>>>>>> >>> Hi
>>>>>> >>>
>>>>>> >>> I am using apache beam with flink and ProtoBuf for encoding and
>>>>>> decoding.
>>>>>> >>>
>>>>>> >>> The following is the method for the FlinkKafka Consumer.
>>>>>> >>>
>>>>>> >>> public UnboundedSource<T extends
>>>>>> Message,UnboundedSource.CheckpointMark>
>>>>>> >>> build() {
>>>>>> >>>     Properties p = new Properties();
>>>>>> >>>     p.setProperty("zookeeper.connect",
>>>>>> kafkaOptions.getZookeeper());
>>>>>> >>>     p.setProperty("bootstrap.servers", kafkaOptions.getBroker());
>>>>>> >>>     p.setProperty("group.id", kafkaOptions.getGroup());
>>>>>> >>>     FlinkKafkaConsumer09<T> kafkaConsumer = new
>>>>>> >>> FlinkKafkaConsumer09<>(kafkaOptions.getKafkaTopic(),
>>>>>> >>>             new SerializationDeserializationSchema(typedSource,
>>>>>> >>> ProtoCoder.of(typedSource)), p);
>>>>>> >>>     return UnboundedFlinkSource.of(kafkaConsumer);
>>>>>> >>> }
>>>>>> >>>
>>>>>> >>> and the helper for serialization and DeSerialization is this.
>>>>>> >>>
>>>>>> >>> public class SerializationDeserializationSchema<T>
>>>>>> >>>         implements SerializationSchema<T>,
>>>>>> DeserializationSchema<T> {
>>>>>> >>>
>>>>>> >>>     private final Class<T> tClass;
>>>>>> >>>
>>>>>> >>>     private final Coder<T> coder;
>>>>>> >>>     private transient ByteArrayOutputStream out;
>>>>>> >>>
>>>>>> >>>     public SerializationDeserializationSchema(Class<T> clazz,
>>>>>> Coder<T>
>>>>>> >>> coder) {
>>>>>> >>>         this.tClass = clazz;
>>>>>> >>>         this.coder = coder;
>>>>>> >>>         this.out = new ByteArrayOutputStream();
>>>>>> >>>     }
>>>>>> >>>
>>>>>> >>>     @Override
>>>>>> >>>     public byte[] serialize(T element) {
>>>>>> >>>
>>>>>> >>>         if (out == null) {
>>>>>> >>>             out = new ByteArrayOutputStream();
>>>>>> >>>         }
>>>>>> >>>         try {
>>>>>> >>>             out.reset();
>>>>>> >>>             coder.encode(element, out, Coder.Context.NESTED);
>>>>>> >>>         } catch (IOException e) {
>>>>>> >>>             throw new RuntimeException("encoding failed.", e);
>>>>>> >>>         }
>>>>>> >>>         return out.toByteArray();
>>>>>> >>>     }
>>>>>> >>>
>>>>>> >>>     @Override
>>>>>> >>>     public T deserialize(byte[] message) throws IOException {
>>>>>> >>>         return coder.decode(new ByteArrayInputStream(message),
>>>>>> >>> Coder.Context.NESTED);
>>>>>> >>>     }
>>>>>> >>>
>>>>>> >>>     @Override
>>>>>> >>>     public boolean isEndOfStream(T nextElement) {
>>>>>> >>>         return false;
>>>>>> >>>     }
>>>>>> >>>
>>>>>> >>>     @Override
>>>>>> >>>     public TypeInformation<T> getProducedType() {
>>>>>> >>>         return TypeExtractor.getForClass(tClass);
>>>>>> >>>     }
>>>>>> >>> }
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> I am getting the following, Kyro exception.
>>>>>> >>>
>>>>>> >>> java.lang.RuntimeException: ConsumerThread threw an exception:
>>>>>> Could not
>>>>>> >>> forward element to next operator
>>>>>> >>>         at
>>>>>> >>>
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336)
>>>>>> >>>         at
>>>>>> >>>
>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>>>>>> >>>         at
>>>>>> >>>
>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>>>>>> >>>         at
>>>>>> >>>
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>>>>>> >>>         at
>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>> >>>         at java.lang.Thread.run(Thread.java:745)
>>>>>> >>>     Caused by: java.lang.RuntimeException: Could not forward
>>>>>> element to
>>>>>> >>> next operator
>>>>>> >>>         at
>>>>>> >>>
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>>>>>> >>>         at
>>>>>> >>>
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>>>>>> >>>         at
>>>>>> >>>
>>>>>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
>>>>>> >>>         at
>>>>>> >>>
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
>>>>>> >>>     Caused by: com.esotericsoftware.kryo.KryoException:
>>>>>> >>> java.lang.UnsupportedOperationException
>>>>>> >>>     Serialization trace:
>>>>>> >>>     records_ (com.model.Employee$Packet)
>>>>>> >>>         at
>>>>>> >>>
>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>>> >>>         at
>>>>>> >>>
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>> >>>         at
>>>>>> com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>>>>>> >>>         at
>>>>>> >>>
>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:168)
>>>>>> >>>         at
>>>>>> >>>
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:349)
>>>>>> >>>         ... 3 more
>>>>>> >>>     Caused by: java.lang.UnsupportedOperationException
>>>>>> >>>         at
>>>>>> >>>
>>>>>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
>>>>>> >>>         at
>>>>>> >>>
>>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>>>>>> >>>         at
>>>>>> >>>
>>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>>>>> >>>         at
>>>>>> com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>>>> >>>         at
>>>>>> >>>
>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> I am not able to avoid this "Kryo" Exception, Thanks for any help.
>>>>>> >>>
>>>>>> >>> Thanks
>>>>>> >>>
>>>>>> >>> Viswadeep.
>>>>>> >>>
>>>>>> >
>>>>>> >
>>>>>>
>>>>>
>>>>>
>>>
>

Reply via email to