Hi Rion,

Let's say that you have topic with 3 partitions and what you want to do is
to read from these 3 partitions and each partition maintains its own
watermark instead of having a watermark over these 3 partitions. Do I
understand this correctly?

 If so, I think you need separated pipelines. If you only want to know
which records come from which partitions, ReadFromKafkaDoFn emits a KV pair
where the KafkaSourceDescriptor is the key and KafkaRecord is the value.

On Mon, Feb 1, 2021 at 11:01 AM Rion Williams <[email protected]> wrote:

> Hi Boyuan,
>
> Do you know if it’s possible to do something similar to this with a single
> topic, essentially treat records with the same keys as their own distinct
> pipelines. The challenge I’m encountering for splitting things downstream
> ends up being related to watermarking at the partition-level (via a
> WatermarkPolicy) and I essentially need to track watermarking or treat
> records with a particular key the same/independently.
>
> I’d assumed that would need to be done prior to reading from Kafka, which
> is where the SDF would come in.
>
> On Feb 1, 2021, at 12:48 PM, Boyuan Zhang <[email protected]> wrote:
>
> 
> Hi Rion,
>
> It sounds like ReadFromKafkaDoFn
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java>
> could be one of the solutions. It takes KafkaSourceDescritpor
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java>(basically
> it's a topic + partition) as input and emit KafkaRecords. Then your
> pipeline can look like:
> testPipeline
>   .apply(your source that generates KafkaSourceDescriptor)
>   .apply(ParDo.of(ReadFromKafkaDoFn))
>   .apply(other parts)
>
> On Mon, Feb 1, 2021 at 8:06 AM Rion Williams <[email protected]>
> wrote:
>
>> Hey all,
>>
>> I'm currently in a situation where I have a single Kafka topic with data
>> across multiple partitions and covers data from multiple sources. I'm
>> trying to see if there's a way that I'd be able to accomplish reading from
>> these different sources as different pipelines and if a Splittable DoFn can
>> do this.
>>
>> Basically - what I'd like to do is for a given key on a record, treat
>> this as a separate pipeline from Kafka:
>>
>> testPipeline
>>     .apply(
>>         /*
>>             Apply some function here to tell Kafka how to describe how to 
>> split up
>>             the sources that I want to read from
>>          */
>>     )
>>     .apply("Ready from Kafka", KafkaIO.read(...))
>>     .apply("Remaining Pipeline Omitted for Brevity"
>>
>> Is it possible to do this? I'm trying to avoid a major architectural
>> change that would require multiple separate topics by source, however if I
>> can guarantee that a given key (and it's associated watermark) are treated
>> separately, that would be ideal.
>>
>> Any advice or recommendations for a strategy that might work would be
>> helpful!
>>
>> Thanks,
>>
>> Rion
>>
>

Reply via email to