Hi Jannik,

Can you share how you've set those properties, because I've been able to
use this without any problems.

Best regards,

Martijn

On Thu, Jun 1, 2023 at 2:43 PM Schmeier, Jannik <j.schme...@fraport.de>
wrote:

> Hello Thias,
>
>
>
> thank you for your answer.
>
>
>
> We've tested registering an existing (byte equal) schema a second time,
> but unfortunately the schema registry does still deny the request.
>
>
>
> Your last suggestion sounds promising, but I think there are some edge
> cases with this approach that will still cause an error. For example when
> writing to a new topic that’s empty, querying this topic before won't
> return any records and therefore the schema would not be put into the
> schemaRegistryClient cache.
>
>
>
> I'm still preferring a flag for the "avro-confluent-registry" format that
> will disable registering schemas and instead the format will just try to
> get the ID for a schema string from the registry. If there is an ID for
> that schema, Flink will use it. If there is none, an exception should be
> thrown.
>
> What do you think of that?
>
>
>
> Best regards,
>
> Jannik
>
>
>
>
>
> *Von:* Schwalbe Matthias <matthias.schwa...@viseca.ch>
> *Gesendet:* Mittwoch, 31. Mai 2023 13:33
> *An:* Schmeier, Jannik <j.schme...@fraport.de>; user@flink.apache.org
> *Betreff:* RE: Using pre-registered schemas with avro-confluent-registry
> format is not possible
>
>
>
>
>
> Hello Jannik,
>
>
>
> Some things to consider (I had a similar problem a couple of years before):
>
>    - The schemaRegistryClient actually caches schema ids, so it will hit
>    the schema registry only once,
>    - The schema registered in schema registry needs to be byte-equal,
>    otherwise schema registry considers it to be a new schema (version)
>    - … to my best knowledge writing an existing schema to the schema
>    registry does not fail because it is actually not written
>       - Could be that this is not entirely true as we had to replace the
>       whole schemaRegistryClient with our own implementation because the 
> existing
>       one could not be reconfigured to accept compressed answers from our r/o
>       proxy
>    - if you manage to fill the cache of your schemaRegistryClient with
>    the exact schema (e.g. by querying it beforehand) you might never run into
>    the trouble
>
>
>
> Hope this helps … keep us posted 😊
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> *From:* Schmeier, Jannik <j.schme...@fraport.de>
> *Sent:* Wednesday, May 31, 2023 12:44 PM
> *To:* user@flink.apache.org
> *Subject:* Using pre-registered schemas with avro-confluent-registry
> format is not possible
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hello,
>
>
>
> I'm trying to use the avro-confluent-registry format with the Confluent
> Cloud Schema Registry in our company.
>
> Our schemas are managed via Terraform and global write access is denied
> for all Kafka clients in our environments (or at least in production).
>
> Therefore, when using the avro-confluent-registry format I'm getting an
> error when Flink is trying to serialize a row:
>
> java.lang.RuntimeException: Failed to serialize row.
>
>                 at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:90)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:40)
> ~[?:?]
>
>                 at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:95)
> ~[?:?]
>
>                 at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.serialize(DynamicKafkaRecordSerializationSchema.java:36)
> ~[?:?]
>
>                 at
> org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:196)
> ~[?:?]
>
>                 at
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
> ~[flink-table-runtime-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:247)
> ~[flink-table-runtime-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at StreamExecCalc$2221.processElement_0_0(Unknown Source)
> ~[?:?]
>
>                 at
> StreamExecCalc$2221.processElement_0_0_rewriteGroup22_split310(Unknown
> Source) ~[?:?]
>
>                 at
> StreamExecCalc$2221.processElement_0_0_rewriteGroup22(Unknown Source) ~[?:?]
>
>                 at StreamExecCalc$2221.processElement_split308(Unknown
> Source) ~[?:?]
>
>                 at StreamExecCalc$2221.processElement(Unknown Source)
> ~[?:?]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.table.runtime.operators.over.RowTimeRangeUnboundedPrecedingFunction.processElementsWithSameTimestamp(RowTimeRangeUnboundedPrecedingFunction.java:74)
> ~[flink-table-runtime-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:228)
> ~[flink-table-runtime-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:602)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:243)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:199)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:114)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:148)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> ~[flink-dist-1.17.0.jar:1.17.0]
>
>                 at java.lang.Thread.run(Unknown Source) ~[?:?]
>
> Caused by: org.apache.flink.util.WrappingRuntimeException: Failed to
> serialize schema registry.
>
>                 at
> org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:90)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88)
> ~[?:?]
>
>                 ... 44 more
>
> Caused by: java.io.IOException: Could not register schema in registry
>
>                 at
> org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:90)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88)
> ~[?:?]
>
>                 ... 44 more
>
> Caused by:
> io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
> User is denied operation Write on Subject: my-topic-key; error code: 40301
>
>                 at
> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:294)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:364)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:507)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:498)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:471)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:221)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:283)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:259)
> ~[?:?]
>
>                 at
> io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.register(SchemaRegistryClient.java:42)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.writeSchema(ConfluentSchemaRegistryCoder.java:85)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.RegistryAvroSerializationSchema.serialize(RegistryAvroSerializationSchema.java:85)
> ~[?:?]
>
>                 at
> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.serialize(AvroRowDataSerializationSchema.java:88)
> ~[?:?]
>
>                 ... 44 more
>
>
> I've inspected the code of the avro-confluent-registry format and it seems
> like there is now way to disable this behavior. The format will always try
> to register a schema when serializing a row:
>
>
> https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java#L85
>
> https://github.com/apache/flink/blob/release-1.17.1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L85
>
> Is there a particular reason for this or would you be interested in adding
> a configuration option to disable this behavior?
>
>
>
> Best regards,
>
> Jannik
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to