Hi Ghiy, I am not quite sure about your actual problem, why the schema is not generated as expected.
I also needed to work with the Kafka keys in the business logic, therefore I found a way to deserialize and serialize the key along with the event itself by overriding KafkaRecord[De]SerializationSchema. I am using Flinks' new Kafka Source / Sink API. The difference to your requirement is that I am only using keys from type Array[Byte]. But this could most certainly be patched. class KeyedEventSerializationSchema[A <: SpecificRecord: ClassTag](topic: String, schemaRegistryUrl: String) extends KafkaRecordSerializationSchema[KeyedEvent[A]] with KafkaContextAware[KeyedEvent[A]] { private val eventClass = classTag[A].runtimeClass.asInstanceOf[Class[A]] private val valueSchema = ConfluentRegistryAvroSerializationSchema.forSpecific(eventClass, eventClass.getCanonicalName, schemaRegistryUrl) // TODO maybe you could add the keySchema here accordingly... override def serialize(element: KeyedEvent[A], context: KafkaSinkContext, timestamp: JLong): ProducerRecord[Array[Byte], Array[Byte]] = new ProducerRecord(getTargetTopic(element), element.key, valueSchema.serialize(element.value)) override def getTargetTopic(element: KeyedEvent[A]): String = topic } final case class KeyedEvent[A <: SpecificRecord](key: Array[Byte], value: A) val keyedEventSerialization = new KeyedEventSerializationSchema[A](OutputTopicName, SchemaRegistryUrl) val kafkaSinkBuilder = KafkaSink.builder[KeyedEvent[A]]() kafkaSinkBuilder .setBootstrapServers(BootstrapServers) .setKafkaProducerConfig(ProducerProperties) .setRecordSerializer(keyedEventSerialization) .build() Maybe this helps. Best Peter On Wed, May 18, 2022 at 7:03 PM Ghiya, Jay (GE Healthcare) <jay.gh...@ge.com> wrote: > Also forgot to attach the information regarding how did I convert the avro > objects to bytes in the approach that I took with deprecated kafka producer. > > > > protected byte[] getValueBytes(Value value) > > { > > DatumWriter<Value> valWriter = new SpecificDatumWriter<Value>( > > Value.getSchema()); > > ByteArrayOutputStream valOut = new ByteArrayOutputStream(); > > BinaryEncoder valEncoder = > EncoderFactory.get().binaryEncoder(valOut, null); > > > > try { > > valWriter.write(value, valEncoder); > > > > // TODO Auto-generated catch block > > > > valEncoder.flush(); > > > > // TODO Auto-generated catch block > > > > valOut.close(); > > > > // TODO Auto-generated catch block > > > > } catch (Exception e) { > > > > } > > > > return valOut.toByteArray(); > > } > > > > protected byte[] getKeyBytes(Key key) { > > > > DatumWriter<Key> keyWriter = new SpecificDatumWriter<Key>( > > key.getSchema()); > > ByteArrayOutputStream keyOut = new ByteArrayOutputStream(); > > BinaryEncoder keyEncoder = > EncoderFactory.get().binaryEncoder(keyOut, null); > > > > try { > > keyWriter.write(key, keyEncoder); > > > > // TODO Auto-generated catch block > > > > keyEncoder.flush(); > > > > // TODO Auto-generated catch block > > > > keyOut.close(); > > > > // TODO Auto-generated catch block > > > > } catch (Exception e) { > > > > } > > > > return keyOut.toByteArray(); > > } > > > > > > > > *From:* Ghiya, Jay (GE Healthcare) > *Sent:* 18 May 2022 21:51 > *To:* user@flink.apache.org > *Cc:* d...@flink.apache.org; Pandiaraj, Satheesh kumar (GE Healthcare) < > satheeshkumar.pandia...@ge.com>; Kumar, Vipin (GE Healthcare) < > vipin.s.ku...@ge.com> > *Subject:* Kafka Sink Key and Value Avro Schema Usage Issues > > > > Hi Team, > > > > This is regarding Flink Kafka Sink. We have a usecase where we have > headers and both key and value as Avro Schema. > > > > Below is the expectation in terms of intuitiveness for avro kafka key and > value: > > > > KafkaSink.<ProducerRecord<Key,Value>>builder() > > .setBootstrapServers(cloudkafkaBrokerAPI) > > .setRecordSerializer( > > KafkaRecordSerializationSchema.builder() > > .setKeySerializationSchema( > > > ConfluentRegistryAvroSerializationSchema > > .forSpecific( > > key.class, > > "Key", > > cloudSchemaRegistryURL)) > > .setValueSerializationSchema( > > > ConfluentRegistryAvroSerializationSchema > > .forSpecific( > > > Value.class,"val", cloudSchemaRegistryURL)) > > .setTopic(outputTopic) > > .build()) > > .build(); > > > > What I understood currently it does not accept key and value both as avro > schemas as part of kafka sink. It only accepts sink. > > > > First I tried to use the deprecated Flink Kafka Producer by implementing > KafkaSerializationSchema and supplying properties of avro ser and der via : > > > cloudKafkaProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class.getName()); > > > cloudKafkaProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class.getName()); > > > > > > The problem here is I am able to run this example but the schema that gets > stored in confluent schema registry is: > > { > > "subject": "ddp_out-key", > > "version": 1, > > "id": 1, > > "schema": "\"bytes\"" > > } > > > > Instead of full schema it has just recognized the whole as bytes. So I am > looking for a solution without kafka sink to make it work as of now and is > there feature request part of roadmap for adding support > > To kafka sink itself for producer record as that would be ideal. The > previous operator can send the producer record with key,val and headers and > then it can be forwarded ahead. > > > > -Jay > > GEHC > > > > >