同行们好,
最近遇到一个使用FlinkSQL无法连接(消费)Confluent Kafka的问题:

Fink 版本:1.12-csadh1.3.0.0

集群平台:Cloudera(CDP)

Kafka:Confluent Kafka

现象:使用如下Flink - SQL 连接(消费)kafka数据(反序列化)失败:

SQL语句如下:
[cid:a8372b16-f6a7-48c0-b19c-d85bf38c1163]
错误日志如下:
java.io.IOException: Could not find schema with id 79 in registry at 
org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:77)
 at 
org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:70)
 at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98)
 ... 9 more
Caused by: 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
Unauthorized; error code: 401 at 
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292)
 at 
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
 at 
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:660)
 at 
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:642)
 at 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)
 at 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)
 at 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)
 at 
io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.getById(SchemaRegistryClient.java:64)
 at 
org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:74)
 ... 11 more

开始以为是参数的使用方式不对,查看了官网的Confluent Avro 
Format<https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro-confluent.html#avro-confluent-basic-auth-credentials-source>部分:
[cid:7a216800-f8ba-4285-959a-3efdbe1f9531]
将With语句中author相关参数的properties前缀去掉:
[cid:2950843b-2263-49c2-8d8e-62e4d1793010]
会直接报错:
org.apache.flink.table.api.ValidationException: Unsupported options found for 
connector 'kafka'.
Unsupported options:
avro-confluent.basic-auth.credentials-source avro-confluent.basic-auth.user-info

补充:

同样的参数,使用DataStream API 是能消费的数据的,Kafka中的topic别的同学一直在使用,Kafka中的数据应该没问题。

回复