Hi Eleanore, Yes, to define output topic dynamically for every record, you may want to use KafkaIO.writeRecords() that takes PCollection<ProducerRecord<K, V> as an input and for every processed ProducerRecord it takes its topic name (if it was specified there), and use it as an output topic. So, in this way, you can set for every record a topic name where it will be published.
If I got your question right, you need to have an intermediate PTransfrom before KafkaIO.writeRecords(), that will use an information from a message field to define a topic where your record should be published, and then create a new ProducerRecord with a proper topic name. > On 14 May 2020, at 07:09, Eleanore Jin <eleanore....@gmail.com> wrote: > > Hi all, > > I have a beam pipeline, which will read from kafka topic via KafkaIO, and > based on the message field, add additional field in the message for the > destination topic. > > I see KakfaIO.write can be used to publish to kafka topics. > > In KafkaIO.java, it construct the ProducerRecord, and getTopic() determines > which topic to publish, and this information is passed when create > PTransforms via KafkaIO.write. > > Any suggestions to dynamically set kafka topic from message field? > > Thanks a lot! > Eleanore