Hi Rion, Let's say that you have topic with 3 partitions and what you want to do is to read from these 3 partitions and each partition maintains its own watermark instead of having a watermark over these 3 partitions. Do I understand this correctly?
If so, I think you need separated pipelines. If you only want to know which records come from which partitions, ReadFromKafkaDoFn emits a KV pair where the KafkaSourceDescriptor is the key and KafkaRecord is the value. On Mon, Feb 1, 2021 at 11:01 AM Rion Williams <[email protected]> wrote: > Hi Boyuan, > > Do you know if it’s possible to do something similar to this with a single > topic, essentially treat records with the same keys as their own distinct > pipelines. The challenge I’m encountering for splitting things downstream > ends up being related to watermarking at the partition-level (via a > WatermarkPolicy) and I essentially need to track watermarking or treat > records with a particular key the same/independently. > > I’d assumed that would need to be done prior to reading from Kafka, which > is where the SDF would come in. > > On Feb 1, 2021, at 12:48 PM, Boyuan Zhang <[email protected]> wrote: > > > Hi Rion, > > It sounds like ReadFromKafkaDoFn > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java> > could be one of the solutions. It takes KafkaSourceDescritpor > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java>(basically > it's a topic + partition) as input and emit KafkaRecords. Then your > pipeline can look like: > testPipeline > .apply(your source that generates KafkaSourceDescriptor) > .apply(ParDo.of(ReadFromKafkaDoFn)) > .apply(other parts) > > On Mon, Feb 1, 2021 at 8:06 AM Rion Williams <[email protected]> > wrote: > >> Hey all, >> >> I'm currently in a situation where I have a single Kafka topic with data >> across multiple partitions and covers data from multiple sources. I'm >> trying to see if there's a way that I'd be able to accomplish reading from >> these different sources as different pipelines and if a Splittable DoFn can >> do this. >> >> Basically - what I'd like to do is for a given key on a record, treat >> this as a separate pipeline from Kafka: >> >> testPipeline >> .apply( >> /* >> Apply some function here to tell Kafka how to describe how to >> split up >> the sources that I want to read from >> */ >> ) >> .apply("Ready from Kafka", KafkaIO.read(...)) >> .apply("Remaining Pipeline Omitted for Brevity" >> >> Is it possible to do this? I'm trying to avoid a major architectural >> change that would require multiple separate topics by source, however if I >> can guarantee that a given key (and it's associated watermark) are treated >> separately, that would be ideal. >> >> Any advice or recommendations for a strategy that might work would be >> helpful! >> >> Thanks, >> >> Rion >> >
