flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的

1、版本说明
flink版本:1.10.2
kafka版本:1.1.0

2、kafka鉴权说明
仅使用了sasl鉴权方式
在kafka客户端有配置 kafka_server-jass.conf、
server.properties、producer.properties、consumer.properties

3、主要配置参数
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.comon.security.plain.PlainLoginModule
required username="xx" password="xx-secret";
当前配置方式,通过java程序来消费鉴权后的kafka,可以正常消费。

4、用于flink SQL连接的jar包
flink-sql-connector-kafka_2.11-1.10.2.jar
flink-jdbc_2.11-1.10.2.jar
flink-csv-1.10.2-sql-jar.jar


5、我的思路
类比java程序中添加kafka认证的相关参数(3中的参数,添加完毕即可),我直接在flink SQL中建立kafka
table时,就指定了相关的3中所说明的参数。但是用flink SQL客户端来实现消费kafka,则会报错。

6、启动客户端
./bin/sql-client.sh embedded -l sql_lib/
其中sql_lib文件夹放的是4中提及的flink-connector相关的jar包


7、建表语句:
create table test_hello (
name string
) with (
...
...
'connector.properties.sasl.mechanism' = 'PLAIN',
'connector.properties.security.protocol' = 'SASL_PLAINTEXT',
'connector.properties.sasl.jaas.config' =
'org.apache.kafka.comon.security.plain.PlainLoginModule required
username="xx" password="xx-secret";',
'format.type' = 'csv'
);

建表没有问题,可以正常建表。

查询表的时候,就会报错,select * from test_hello;
报错如下:
could not execute sql statement. Reason:
javax.security.auth.login.loginException: unable to find loginModule class:
org.apache.kafka.common.security.plain.PlainLoginModule
但是这个类我在flink-sql-connector-kafka_2.11-1.10.2.jar包中是可以发现的,所以不知道什么原因?

kafka添加鉴权后,我使用flink SQL的方式不对吗?希望有知道能帮忙解答一下,谢谢。

回复