Hi, 按照现在通用的设计应该是不行的。要么用两个comsumer读取后union;要么魔改下comsumer的代码,在真正数据拉取时用不同的aksk去读。
-- Best! Xuyang 在 2024-01-09 14:49:35,"somebody someone" <1107807...@qq.com.INVALID> 写道: >问题:目前使用Flink版本1.12 >需要接入01和02两个topic,属于同一集群,但是数据方给的两个topic的 >jass的用户名username和密码password不一样,其他认证信息都一样,不想用两个Consumer去分别读取, >怎么用同一个source 方式对接这种配置文件不一样的。 > >这个上面也有人提出过,也没有想要的。 >https://stackoverflow.com/questions/38989443/flink-how-to-read-from-multiple-kafka-cluster-using-same-streamexecutionenviron > > >StreamExecutionEnvironment env = >StreamExecutionEnvironment.getExecutionEnvironment(); > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", >"101.xxx.156.xxx:9097"); > properties.setProperty("group.id", "test_zj"); > properties.setProperty("security.protocol", >"SASL_SSL"); > properties.setProperty("sasl.mechanism", "PLAIN"); > properties.setProperty("sasl.jaas.config", >"org.apache.kafka.common.security.plain.PlainLoginModule required >username=\"xxx-9dc9-xxxxxxx\" password=\"xxxx-4CdNkCo$5b=xxx";"); > properties.setProperty("ssl.truststore.location", >"jks/client.truststore.jks"); > properties.setProperty("ssl.truststore.password", >"dmxxx"); > > > FlinkKafkaConsumer<String> >stringFlinkKafkaConsumer = new FlinkKafkaConsumer<>( > >Arrays.asList("Topic01","topic02"), > new >SimpleStringSchema(), > properties > ); > > > DataStreamSource<String> source = >env.addSource(stringFlinkKafkaConsumer); > source.print(); > env.execute(); > > > > > > > > > > >somebody someone >1107807...@qq.com > > > >