Hi,
Aljosha meants the Beam "native" KafkaIO
(org.apache.beam.sdk.io.kafka.KafkaIO) provided by the following artifact:
groupId: org.apache.beam.io
artifactId: kafka
Regards
JB
On 06/03/2016 01:03 AM, Viswadeep wrote:
are suggesting about this org.apache.beam.runners.spark.io.KafkaIO; class?
Which is part of spark?
Thanks
Viswadeep
Viswadeep Veguru.
On Thu, Jun 2, 2016 at 2:32 PM, Aljoscha Krettek <[email protected]
<mailto:[email protected]>> wrote:
By the way, could you you try just using KafkaIO.Read. This is the
the official Beam Kafka source and it should work well with the
Flink runner.
On Thu, 2 Jun 2016 at 22:47 Viswadeep <[email protected]
<mailto:[email protected]>> wrote:
yes!, i have tried it works.
-Viswadeep
Viswadeep Veguru.
On Thu, Jun 2, 2016 at 12:43 PM, Amit Sela <[email protected]
<mailto:[email protected]>> wrote:
I'm not too familiar with Flink but is there a way to simply
read bytes of Kafka and in the next step (map ?) to
Deserialize the bytes that represent this object and
transform it into a "Kryo-compliant" object ? This should
avoid Kryo, right ?
On Thu, Jun 2, 2016 at 10:31 PM Viswadeep
<[email protected] <mailto:[email protected]>> wrote:
Hi Max,
I tried your approach (1) it did not work. I am still
getting the same exception.
(2) is not possible, unless protobuff complier changes.
Yes setting that in config also did not work as well.
I think some where inside flink it is not able to
process correctly.
Thanks
Viswadeep
Viswadeep Veguru.
On Thu, Jun 2, 2016 at 10:09 AM, Maximilian Michels
<[email protected] <mailto:[email protected]>> wrote:
Hi Viswadeep,
What Amit recommended (thanks!) is indeed an easy
fix if you're using
Flink. However, we don't expose the ExecutionConfig
in Beam. So
setting custom Kryo serializers is not possible.
Two other options I see:
1) Could you try using the following in your
deserialization schema?
@Override
public TypeInformation<T> getProducedType() {
return new CoderTypeInformation<>(coder);
}
2) Could you avoid using
Collections.UnmodifiableList in your code?
Not sure if it is possible because it seems to be a
field in your
Protobuf class.
Thanks,
Max
On Thu, Jun 2, 2016 at 4:49 PM, Viswadeep
<[email protected] <mailto:[email protected]>>
wrote:
> Thanks Amit,
>
> It did not solve my issue, Flink it self has the
following comment in
> ExecutionConfig.java
>
> /**
> * Force TypeExtractor to use Kryo serializer for
POJOS even though we could
> analyze as POJO.
> * In some cases this might be preferable. For
example, when using
> interfaces
> * with subclasses that cannot be analyzed as POJO.
> */
>
>
> Because generated Protobuf class extends
GeneratedMessage implements
> Employee.PacketOrBuilder
>
>
> Thanks for any help.
> Viswadeep
>
>
> On Wed, Jun 1, 2016 at 11:02 PM, Amit Sela
<[email protected] <mailto:[email protected]>>
wrote:
>>
>> I think it has to do with Flink's internal use
of Kryo - take a look at
>> this:
>>
http://stackoverflow.com/questions/32453030/using-an-collectionsunmodifiablecollection-with-apache-flink
>> I'm sure Flink committers will soon reach out to
correct me if I'm missing
>> something..
>>
>> On Thu, Jun 2, 2016 at 5:01 AM Viswadeep
<[email protected] <mailto:[email protected]>>
wrote:
>>>
>>> Hi
>>>
>>> I am using apache beam with flink and ProtoBuf
for encoding and decoding.
>>>
>>> The following is the method for the FlinkKafka
Consumer.
>>>
>>> public UnboundedSource<T extends
Message,UnboundedSource.CheckpointMark>
>>> build() {
>>> Properties p = new Properties();
>>> p.setProperty("zookeeper.connect",
kafkaOptions.getZookeeper());
>>> p.setProperty("bootstrap.servers",
kafkaOptions.getBroker());
>>> p.setProperty("group.id <http://group.id>",
kafkaOptions.getGroup());
>>> FlinkKafkaConsumer09<T> kafkaConsumer = new
>>>
FlinkKafkaConsumer09<>(kafkaOptions.getKafkaTopic(),
>>> new
SerializationDeserializationSchema(typedSource,
>>> ProtoCoder.of(typedSource)), p);
>>> return UnboundedFlinkSource.of(kafkaConsumer);
>>> }
>>>
>>> and the helper for serialization and
DeSerialization is this.
>>>
>>> public class SerializationDeserializationSchema<T>
>>> implements SerializationSchema<T>,
DeserializationSchema<T> {
>>>
>>> private final Class<T> tClass;
>>>
>>> private final Coder<T> coder;
>>> private transient ByteArrayOutputStream out;
>>>
>>> public
SerializationDeserializationSchema(Class<T> clazz,
Coder<T>
>>> coder) {
>>> this.tClass = clazz;
>>> this.coder = coder;
>>> this.out = new ByteArrayOutputStream();
>>> }
>>>
>>> @Override
>>> public byte[] serialize(T element) {
>>>
>>> if (out == null) {
>>> out = new ByteArrayOutputStream();
>>> }
>>> try {
>>> out.reset();
>>> coder.encode(element, out,
Coder.Context.NESTED);
>>> } catch (IOException e) {
>>> throw new
RuntimeException("encoding failed.", e);
>>> }
>>> return out.toByteArray();
>>> }
>>>
>>> @Override
>>> public T deserialize(byte[] message) throws
IOException {
>>> return coder.decode(new
ByteArrayInputStream(message),
>>> Coder.Context.NESTED);
>>> }
>>>
>>> @Override
>>> public boolean isEndOfStream(T nextElement) {
>>> return false;
>>> }
>>>
>>> @Override
>>> public TypeInformation<T> getProducedType() {
>>> return TypeExtractor.getForClass(tClass);
>>> }
>>> }
>>>
>>>
>>> I am getting the following, Kyro exception.
>>>
>>> java.lang.RuntimeException: ConsumerThread
threw an exception: Could not
>>> forward element to next operator
>>> at
>>>
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:336)
>>> at
>>>
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>>> at
>>>
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>>> at
>>>
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>>> at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.RuntimeException:
Could not forward element to
>>> next operator
>>> at
>>>
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>>> at
>>>
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>>> at
>>>
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
>>> at
>>>
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
>>> Caused by:
com.esotericsoftware.kryo.KryoException:
>>> java.lang.UnsupportedOperationException
>>> Serialization trace:
>>> records_ (com.model.Employee$Packet)
>>> at
>>>
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>> at
>>>
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>> at
com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>>> at
>>>
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:168)
>>> at
>>>
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:349)
>>> ... 3 more
>>> Caused by:
java.lang.UnsupportedOperationException
>>> at
>>>
java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
>>> at
>>>
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>>> at
>>>
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>> at
com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>> at
>>>
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>
>>>
>>> I am not able to avoid this "Kryo" Exception,
Thanks for any help.
>>>
>>> Thanks
>>>
>>> Viswadeep.
>>>
>
>
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com