Dawid,

Is there a projected date to deliver ConfluentRegistryAvroSerializationSchema ?

thank you,
Olga

________________________________
From: Dawid Wysakowicz <wysakowicz.da...@gmail.com>
Sent: Monday, October 22, 2018 10:40 AM
To: trebl...@hotmail.com
Cc: user
Subject: Re: FlinkKafkaProducer and Confluent Schema Registry

Hi Olga,
There is an open PR[1] that has some in-progress work on corresponding 
AvroSerializationSchema, you can have a look at it. The bigger issue there is 
that SerializationSchema does not have access to event's key so using topic 
pattern might be problematic.
Best,
Dawid

[1] https://github.com/apache/flink/pull/6259

On Mon, 22 Oct 2018 at 16:51, Kostas Kloudas 
<k.klou...@data-artisans.com<mailto:k.klou...@data-artisans.com>> wrote:
Hi Olga,

Sorry for the late reply.
I think that Gordon (cc’ed) could be able to answer your question.

Cheers,
Kostas

On Oct 13, 2018, at 3:10 PM, Olga Luganska 
<trebl...@hotmail.com<mailto:trebl...@hotmail.com>> wrote:

Any suggestions?

Thank you

Sent from my iPhone

On Oct 9, 2018, at 9:28 PM, Olga Luganska 
<trebl...@hotmail.com<mailto:trebl...@hotmail.com>> wrote:

Hello,

I would like to use Confluent Schema Registry in my streaming job.
I was able to make it work with the help of generic Kafka producer and 
FlinkKafkaConsumer which is using ConfluentRegistryAvroDeserializationSchema.

FlinkKafkaConsumer011<GenericRecord> consumer = new 
FlinkKafkaConsumer011<>(MY_TOPIC,
ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, SCHEMA_URI), 
kafkaProperties);

My question: is it possible to implement producer logic in the 
FlinkKafkaProducer to serialize message and store schema id in the Confluent 
Schema registry?

I don't think this is going to work with the current interface because creation 
and caching of the schema id in the Confluent Schema Registry is done with the 
help of io.confluent.kafka.serializers.KafkaAvroSerializer.class  and all 
FlinkKafkaProducer constructors have either SerializationSchema or 
KeyedSerializationSchema (part of Flink's own serialization stack) as one of 
the parameters.
If my assumption is wrong, could you please provide details of implementation?
Thank you very much,
Olga





​





Reply via email to