Hi, 夏帅的方案是ok的,因为Kafka 默认支持写入topic不存在时自动创建[1], 这个配置是默认开启的,所以只用实现下自定义KafkaSerializationSchema就可以满足你的需求。
祝好, Leonard Xu [1] https://docs.confluent.io/current/installation/configuration/broker-configs.html#auto.create.topics.enable <https://docs.confluent.io/current/installation/configuration/broker-configs.html#auto.create.topics.enable> > 在 2020年7月8日,11:08,flink小猪 <[email protected]> 写道: > > 兄弟,感谢 > > > > > > > > > > > > > > > > > 在 2020-07-08 11:04:40,"夏帅" <[email protected]> 写道: > > 你好,可以尝试自定义KafkaSerializationSchema来实现你的业务场景 > class DemoSerializationSchema extends KafkaSerializationSchema[DemoBean] { > override def serialize(element: DemoBean, timestamp: lang.Long): > ProducerRecord[Array[Byte], Array[Byte]] = { > new ProducerRecord[Array[Byte], Array[Byte]](element.getTopic, > element.getValue) > } > } > ------------------------------------------------------------------ > 发件人:[email protected] <[email protected]> > 发送时间:2020年7月8日(星期三) 10:59 > 收件人:user-zh <[email protected]> > 主 题:FlinkKafkaProducer没有写入多个topic的功能 > > > 我有一个需求是通过读取一个kafka的主题的数据经过flink处理再写入到多个kafka的主题中(写入的主题是动态的,数据中能解析到需要写入到的目的地主题), > 但是FlinkKafkaProducer好像只能写入一个主题里面? > > > > [email protected] > >
