+1 to what Rahul said before - KafkaIO can’t filter out Kafka topics in runtime 
and you need to change your TimestampPolicy accordingly . 

Please, don’t hesitate to share with us a code of your new TimestampPolicy if 
you still have any questions/issues with this.

PS: I move this discussion to user@ as more related mailing list for these 
questions.

> On 29 Feb 2020, at 12:36, Maulik Soneji <[email protected]> wrote:
> 
> Hello Rahul,
> 
> Thanks again for the detailed explanation.
> 
> I require some guidance on what values to be set for maxDelay and 
> previousWatermark for CustomTimestampPolicyWithLimitedDelay.
> 
> Currently, I was providing maxDelay as Duration.ZERO and previousWatermark as 
> Optional.empty().
> With these values I see that the getWatermark function always goes to else 
> block(code link) and always returns TIMESTAMP_MIN_VALUE.
> So with this case as well, I see that the watermark is returned as 
> TIMESTAMP_MIN_VALUE for zero throughput topics.
> 
> Please share your observations on how to tune the Timestamp Policy.
> 
> Thanks and regards,
> Maulik
> 
> 
> On Fri, Feb 28, 2020 at 8:46 PM rahul patwari <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi Maulik,
> 
> Currently, I don't think it is possible to filter topics based on whether 
> data is being produced to the topic (or) not. 
> But, the Watermark logic can be changed to make the Pipeline work.
> 
> Since the timestamps of the records are the time when the events are pushed 
> to Kafka, every record will have monotonically increasing timestamps except 
> for out of order events.
> Instead of assigning the Watermark as BoundedWindow.TIMESTAMP_MIN_VALUE by 
> default, we can assign [current_timestamp - some_delay] as default and the 
> same can be done in getWatermark() method, in which case, even if the 
> partition is idle, Watermark will advance.
> 
> Make sure that the timestamp of the Watermark is monotonically increasing and 
> choose the delay carefully in order to avoid discarding out of order events.
> 
> Refer 
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java
>  
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java>
>  for an example.
> 
> Regards,
> Rahul
> 
> 
> On Fri, Feb 28, 2020 at 6:54 PM Maulik Soneji <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi Rahul,
> 
> Thank you very much for the detailed explanation.
> 
> Since we don't know which are the topics that have zero throughputs, is there 
> a way in which we can filter out such topics in KafkaIO?
> 
> Since KafkaIO doesn't support passing a regex to consume data from, I am 
> getting a list of topics from kafka and passing it. 
> 
> Is there a way to filter out such topics? Also, it can happen that when the 
> job has started the topic might have no data for a few windows and after 
> that, it can get some data. This filter should be dynamic as well.
> 
> Please share some ideas on how we can make this work.
> 
> Community members, please share your thoughts as well on how we can achieve 
> this.
> 
> Thanks and regards,
> Maulik
> 
> On Fri, Feb 28, 2020 at 3:03 PM rahul patwari <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi Maulik,
> 
> This seems like an issue with Watermark.
> According to 
> https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L240
>  
> <https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L240>,
> 
> If there are multiple partitions (or) multiple topics, Watermark will be 
> calculated for each of the partition and the minimum watermark is considered 
> as the current Watermark.
> Assuming that no message is pushed to the topic with 0 throughput, according 
> to your logic for the watermark calculation, the watermark of each partition 
> for this topic will be BoundedWindow.TIMESTAMP_MIN_VALUE (the smallest 
> representable timestamp of an element - 
> https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/model/pipeline/src/main/proto/beam_runner_api.proto#L44
>  
> <https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/model/pipeline/src/main/proto/beam_runner_api.proto#L44>).
> 
> As the result will be emitted from GroupByKey when the Watermark crosses the 
> window and as the watermark is BoundedWindow.TIMESTAMP_MIN_VALUE, you are not 
> seeing the results from GroupByKey.
> 
> Regards,
> Rahul 
> 
> On Fri, Feb 28, 2020 at 12:39 PM Maulik Soneji <[email protected] 
> <mailto:[email protected]>> wrote:
> Observations:
> If we read using KafkaIO for a list of topics where one of the topics has 
> zero throughputs,
> and KafkaIO is followed by GroupByKey stage, then:
> a. No data is output from GroupByKey stage for all the topics and not just 
> the zero throughput topic.
> 
> If all topics have some throughput coming in, then it works fine and we get 
> some output from GroupByKey stage.
> 
> Is this an issue?
> 
> Points:
> a. The output from GroupByKey is only when all topics have some throughput
> b. This is a problem with KafkaIO + GroupByKey, for case where I have FileIO 
> + GroupByKey, this issue doesn't arise. GroupByKey outputs some data even if 
> there is no data for one of the files.
> c. Not a runner issue, since I ran it with FlinkRunner and DataflowRunner
> d. Even if lag is different for each topic on the list, we still get some 
> output from GroupByKey.
> 
> Debugging:
> While Debugging this issue I found that in split function of 
> KafkaUnboundedSource we create KafkaUnboundedSource where partition list is 
> one partition for each topic.
> 
> I am not sure if this is some issue with watermark, since watermark for the 
> topic with no throughput will not advance. But this looks like the most 
> likely cause to me.
> 
> Please help me in figuring out whether this is an issue or if there is 
> something wrong with my pipeline.
> 
> Attaching detailed pipeline information for more details:
> 
> Context:
> I am currently using KafkaIO to read data from kafka for a list of topics 
> with a custom timestamp policy.
> 
> Below is how I am constructing KafkaIO reader:
> return KafkaIO.<byte[], byte[]>read()
>         .withBootstrapServers(brokers)
>         .withTopics(topics)
>         .withKeyDeserializer(ByteArrayDeserializer.class)
>         .withValueDeserializer(ByteArrayDeserializer.class)
>         .withTimestampPolicyFactory((partition, previousWatermark) -> new 
> EventTimestampPolicy(godataService, previousWatermark))
>         .commitOffsetsInFinalize();
> Pipeline Information:
> Pipeline Consists of six steps:
> a. Read From Kafka with custom timestamp policy
> b. Convert KafkaRecord to Message object
> c. Window based on FixedWindow of 10 minutes triggering AfterWatermark
> d. PCollection<Message> to PCollection<KV<String, Message>> where Topic is Key
> e. GroupByKey.create() to get PCollection<KV<String, Iterable<Message>>
> f. PCollection<KV<String, Iterable<Message>> to PCollection<ComputedMetrics> 
> for each topic
> g. Write output to kafka
> 
> Detailed Pipeline Information
> a. Read data from kafka to get KafkaRecord<byte[], byte[]>
> Here I am using my own timestamp policy which looks like below:
> public EventTimestampPolicy(MyService myService, Optional<Instant> 
> previousWatermark) {
>     this.myService = myService;
>     this.currentWatermark = 
> previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
> }
> 
> @Override
> public Instant getTimestampForRecord(PartitionContext context, 
> KafkaRecord<byte[], byte[]> record) {
>     Instant eventTimestamp;
>     try {
>         eventTimestamp = Deserializer.getEventTimestamp(record, myService);
>     } catch (InvalidProtocolBufferException e) {
>         statsClient.increment("io.proto.buffer.exception");
>         throw new RuntimeException(e);
>     }
>     this.currentWatermark = eventTimestamp;
>     return this.currentWatermark;
> }
> 
> @Override
> public Instant getWatermark(PartitionContext ctx) {
>     return this.currentWatermark;
> }
> Event timestamp is one of the fields in the kafka message. It is the time 
> when the event was pushed to kafka.
> b. DoFn to transform KafkaRecord<byte[], byte[]> to Message class.
> The Message class contains properties like offset, topic, partition, offset 
> and timestamp
> c. Windowing on 10 minute fixed window triggering at 
> AfterWatermark.pastEndOfWindow() 
> d. PCollection<Message> to PCollection<KV<String, Message>>
> Here Key is the kafka topic.
> 
> e. GroupByKey to get PCollection<KV<String, Iterable<Message>>
> f. PCollection<KV<String, Iterable<Message>> to PCollection<ComputedMetrics> 
> for each topic
> g. Write output to kafka

Reply via email to