Hi Mohil,
In Java SDK you can use “KafkaIO.writeRecords()” for that. So, you will need to
provide a PCollection<ProducerRecord<K, V>> as an input collection where you
set a desired output topic for every record inside ProducerRecord metadata.
It could look something like this:
PCollection<KV<Teamname, data>> teams = ...;
PCollection<ProducerRecord<Teamname, data >> records = teams.apply(ParDo.of(new
KV2ProducerRecord());
private static class KV2ProducerRecord
extends DoFn<KV<Teamname, data>, ProducerRecord<Teamname, data>> {
// create ProducerRecord and set your topic there
...
}
records.apply(KafkaIO.<Teamname, data>writeRecords()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("results”) // default sink topic
.withKeySerializer(...)
.withValueSerializer(...));
> On 2 Jun 2020, at 03:27, Mohil Khare <[email protected]> wrote:
>
> Hello everyone,
>
> Does anyone know if it is possible to provide a topic name embedded in a
> PCollection object to kafkaIO while writing ?
>
> We have a use case where we have a team specific kafka topic for eg
> teamA_topicname, teamB_topicname.
>
> From beam, we create PCollection<KV<Teamname, data>> and we need to send this
> data to kafka over aforementioned team specific topics.
> Is it possible to provide topic names dynamically to
> kafkaIO.write().withTopic() from Key present in KV PCollection ?
>
> Thanks and regards
> Mohil
>
>
>
>