Hi Eleanore,

Yes, to define output topic dynamically for every record, you may want to use 
KafkaIO.writeRecords() that takes PCollection<ProducerRecord<K, V> as an input 
and for every processed ProducerRecord it takes its topic name (if it was 
specified there), and use it as an output topic. So, in this way, you can set 
for every record a topic name where it will be published. 

If I got your question right, you need to have an intermediate PTransfrom 
before KafkaIO.writeRecords(), that will use an information from a message 
field to define a topic where your record should be published, and then create 
a new ProducerRecord with a proper topic name.

> On 14 May 2020, at 07:09, Eleanore Jin <eleanore....@gmail.com> wrote:
> 
> Hi all, 
> 
> I have a beam pipeline, which will read from kafka topic via KafkaIO, and 
> based on the message field, add additional field in the message for the 
> destination topic. 
> 
> I see KakfaIO.write can be used to publish to kafka topics. 
> 
> In KafkaIO.java, it construct the ProducerRecord, and getTopic() determines 
> which topic to publish, and this information is passed when create 
> PTransforms via KafkaIO.write.
> 
> Any suggestions to dynamically set kafka topic from message field? 
> 
> Thanks a lot!
> Eleanore

Reply via email to