Hi Mohil,

In Java SDK you can use “KafkaIO.writeRecords()” for that. So, you will need to 
provide a PCollection<ProducerRecord<K, V>> as an input collection where you 
set a desired output topic for every record inside ProducerRecord metadata.
It could look something like this:

PCollection<KV<Teamname, data>>  teams = ...;

PCollection<ProducerRecord<Teamname, data >> records = teams.apply(ParDo.of(new 
KV2ProducerRecord());

private static class KV2ProducerRecord
    extends DoFn<KV<Teamname, data>, ProducerRecord<Teamname, data>> {
    // create ProducerRecord and set your topic there
    ...
}

records.apply(KafkaIO.<Teamname, data>writeRecords()
    .withBootstrapServers("broker_1:9092,broker_2:9092")
    .withTopic("results”) // default sink topic 
    .withKeySerializer(...)
    .withValueSerializer(...));



> On 2 Jun 2020, at 03:27, Mohil Khare <[email protected]> wrote:
> 
> Hello everyone,
> 
> Does anyone know if  it is possible to provide a topic name embedded in a 
> PCollection object to kafkaIO while writing ?
> 
> We have a use case where we have a team specific kafka topic for eg 
> teamA_topicname, teamB_topicname.
> 
> From beam, we create PCollection<KV<Teamname, data>> and we need to send this 
> data to kafka over aforementioned team specific topics.
> Is it possible to provide topic names dynamically to 
> kafkaIO.write().withTopic() from Key  present in KV PCollection ?
> 
> Thanks and regards
> Mohil
> 
> 
> 
> 

Reply via email to