Hi Alex,

Thanks a lot for the information!

Best
Eleanore

On Thu, May 14, 2020 at 1:53 AM Alexey Romanenko <aromanenko....@gmail.com>
wrote:

> Hi Eleanore,
>
> Yes, to define output topic dynamically for every record, you may want to
> use KafkaIO.writeRecords() that takes PCollection<ProducerRecord<K, V> as
> an input and for every processed ProducerRecord it takes its topic name (if
> it was specified there), and use it as an output topic. So, in this way,
> you can set for every record a topic name where it will be published.
>
> If I got your question right, you need to have an intermediate PTransfrom
> before KafkaIO.writeRecords(), that will use an information from a message
> field to define a topic where your record should be published, and then
> create a new ProducerRecord with a proper topic name.
>
> > On 14 May 2020, at 07:09, Eleanore Jin <eleanore....@gmail.com> wrote:
> >
> > Hi all,
> >
> > I have a beam pipeline, which will read from kafka topic via KafkaIO,
> and based on the message field, add additional field in the message for the
> destination topic.
> >
> > I see KakfaIO.write can be used to publish to kafka topics.
> >
> > In KafkaIO.java, it construct the ProducerRecord, and getTopic()
> determines which topic to publish, and this information is passed when
> create PTransforms via KafkaIO.write.
> >
> > Any suggestions to dynamically set kafka topic from message field?
> >
> > Thanks a lot!
> > Eleanore
>
>

Reply via email to