Hi Ghiy,

I am not quite sure about your actual problem, why the schema is not
generated as expected.

I also needed to work with the Kafka keys in the business logic, therefore
I found a way to deserialize and serialize the key along with the event
itself by overriding KafkaRecord[De]SerializationSchema. I am using Flinks'
new Kafka Source / Sink API. The difference to your requirement is that I
am only using keys from type Array[Byte]. But this could most certainly be
patched.


class KeyedEventSerializationSchema[A <: SpecificRecord:
ClassTag](topic: String, schemaRegistryUrl: String)
    extends KafkaRecordSerializationSchema[KeyedEvent[A]]
    with KafkaContextAware[KeyedEvent[A]] {

  private val eventClass = classTag[A].runtimeClass.asInstanceOf[Class[A]]
  private val valueSchema =
ConfluentRegistryAvroSerializationSchema.forSpecific(eventClass,
eventClass.getCanonicalName, schemaRegistryUrl)
  // TODO maybe you could add the keySchema here accordingly...

  override def serialize(element: KeyedEvent[A], context:
KafkaSinkContext, timestamp: JLong): ProducerRecord[Array[Byte],
Array[Byte]] =
    new ProducerRecord(getTargetTopic(element), element.key,
valueSchema.serialize(element.value))

  override def getTargetTopic(element: KeyedEvent[A]): String = topic
}

final case class KeyedEvent[A <: SpecificRecord](key: Array[Byte], value: A)

val keyedEventSerialization = new
KeyedEventSerializationSchema[A](OutputTopicName, SchemaRegistryUrl)
val kafkaSinkBuilder = KafkaSink.builder[KeyedEvent[A]]()
kafkaSinkBuilder
  .setBootstrapServers(BootstrapServers)
  .setKafkaProducerConfig(ProducerProperties)
  .setRecordSerializer(keyedEventSerialization)
  .build()

Maybe this helps.

Best Peter

On Wed, May 18, 2022 at 7:03 PM Ghiya, Jay (GE Healthcare) <jay.gh...@ge.com>
wrote:

> Also forgot to attach the information regarding how did I convert the avro
> objects to bytes in the approach that I took with deprecated kafka producer.
>
>
>
>     protected byte[] getValueBytes(Value value)
>
>     {
>
>         DatumWriter<Value> valWriter = new SpecificDatumWriter<Value>(
>
>             Value.getSchema());
>
>         ByteArrayOutputStream valOut = new ByteArrayOutputStream();
>
>         BinaryEncoder valEncoder =
> EncoderFactory.get().binaryEncoder(valOut, null);
>
>
>
>         try {
>
>             valWriter.write(value, valEncoder);
>
>
>
>             // TODO Auto-generated catch block
>
>
>
>             valEncoder.flush();
>
>
>
>             // TODO Auto-generated catch block
>
>
>
>             valOut.close();
>
>
>
>             // TODO Auto-generated catch block
>
>
>
>         } catch (Exception e) {
>
>
>
>         }
>
>
>
>         return valOut.toByteArray();
>
>     }
>
>
>
>     protected byte[] getKeyBytes(Key key) {
>
>
>
>         DatumWriter<Key> keyWriter = new SpecificDatumWriter<Key>(
>
>             key.getSchema());
>
>         ByteArrayOutputStream keyOut = new ByteArrayOutputStream();
>
>         BinaryEncoder keyEncoder =
> EncoderFactory.get().binaryEncoder(keyOut, null);
>
>
>
>         try {
>
>             keyWriter.write(key, keyEncoder);
>
>
>
>             // TODO Auto-generated catch block
>
>
>
>             keyEncoder.flush();
>
>
>
>             // TODO Auto-generated catch block
>
>
>
>             keyOut.close();
>
>
>
>             // TODO Auto-generated catch block
>
>
>
>         } catch (Exception e) {
>
>
>
>         }
>
>
>
>         return keyOut.toByteArray();
>
>     }
>
>
>
>
>
>
>
> *From:* Ghiya, Jay (GE Healthcare)
> *Sent:* 18 May 2022 21:51
> *To:* user@flink.apache.org
> *Cc:* d...@flink.apache.org; Pandiaraj, Satheesh kumar (GE Healthcare) <
> satheeshkumar.pandia...@ge.com>; Kumar, Vipin (GE Healthcare) <
> vipin.s.ku...@ge.com>
> *Subject:* Kafka Sink Key and Value Avro Schema Usage Issues
>
>
>
> Hi Team,
>
>
>
> This is regarding Flink Kafka Sink. We have a usecase where we have
> headers and both key and value as Avro Schema.
>
>
>
> Below is the expectation in terms of intuitiveness for avro kafka key and
> value:
>
>
>
> KafkaSink.<ProducerRecord<Key,Value>>builder()
>
>                         .setBootstrapServers(cloudkafkaBrokerAPI)
>
>                         .setRecordSerializer(
>
>                                 KafkaRecordSerializationSchema.builder()
>
>                                 .setKeySerializationSchema(
>
>
> ConfluentRegistryAvroSerializationSchema
>
>                                 .forSpecific(
>
>                                     key.class,
>
>                                         "Key",
>
>                                         cloudSchemaRegistryURL))
>
>                                 .setValueSerializationSchema(
>
>
>              ConfluentRegistryAvroSerializationSchema
>
>                                                         .forSpecific(
>
>
> Value.class,"val", cloudSchemaRegistryURL))
>
>                                         .setTopic(outputTopic)
>
>                                         .build())
>
>                         .build();
>
>
>
> What I understood currently it does not accept key and value both as avro
> schemas as part of kafka sink. It only accepts sink.
>
>
>
> First I tried to use the deprecated Flink Kafka Producer by implementing
> KafkaSerializationSchema and supplying properties of avro ser and der via :
>
>
> cloudKafkaProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class.getName());
>
>
> cloudKafkaProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class.getName());
>
>
>
>
>
> The problem here is I am able to run this example but the schema that gets
> stored in confluent schema registry is:
>
> {
>
>     "subject": "ddp_out-key",
>
>     "version": 1,
>
>     "id": 1,
>
>     "schema": "\"bytes\""
>
> }
>
>
>
> Instead of full schema it has just recognized the whole as bytes. So I am
> looking for a solution without kafka sink to make it work as of now and is
> there feature request part of roadmap for adding support
>
> To kafka sink itself for producer record as that would be ideal. The
> previous operator can send the producer record with key,val and headers and
> then it can be forwarded ahead.
>
>
>
> -Jay
>
> GEHC
>
>
>
>
>

Reply via email to