+1 to what Rahul said before - KafkaIO can’t filter out Kafka topics in runtime and you need to change your TimestampPolicy accordingly .
Please, don’t hesitate to share with us a code of your new TimestampPolicy if you still have any questions/issues with this. PS: I move this discussion to user@ as more related mailing list for these questions. > On 29 Feb 2020, at 12:36, Maulik Soneji <[email protected]> wrote: > > Hello Rahul, > > Thanks again for the detailed explanation. > > I require some guidance on what values to be set for maxDelay and > previousWatermark for CustomTimestampPolicyWithLimitedDelay. > > Currently, I was providing maxDelay as Duration.ZERO and previousWatermark as > Optional.empty(). > With these values I see that the getWatermark function always goes to else > block(code link) and always returns TIMESTAMP_MIN_VALUE. > So with this case as well, I see that the watermark is returned as > TIMESTAMP_MIN_VALUE for zero throughput topics. > > Please share your observations on how to tune the Timestamp Policy. > > Thanks and regards, > Maulik > > > On Fri, Feb 28, 2020 at 8:46 PM rahul patwari <[email protected] > <mailto:[email protected]>> wrote: > Hi Maulik, > > Currently, I don't think it is possible to filter topics based on whether > data is being produced to the topic (or) not. > But, the Watermark logic can be changed to make the Pipeline work. > > Since the timestamps of the records are the time when the events are pushed > to Kafka, every record will have monotonically increasing timestamps except > for out of order events. > Instead of assigning the Watermark as BoundedWindow.TIMESTAMP_MIN_VALUE by > default, we can assign [current_timestamp - some_delay] as default and the > same can be done in getWatermark() method, in which case, even if the > partition is idle, Watermark will advance. > > Make sure that the timestamp of the Watermark is monotonically increasing and > choose the delay carefully in order to avoid discarding out of order events. > > Refer > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java > > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java> > for an example. > > Regards, > Rahul > > > On Fri, Feb 28, 2020 at 6:54 PM Maulik Soneji <[email protected] > <mailto:[email protected]>> wrote: > Hi Rahul, > > Thank you very much for the detailed explanation. > > Since we don't know which are the topics that have zero throughputs, is there > a way in which we can filter out such topics in KafkaIO? > > Since KafkaIO doesn't support passing a regex to consume data from, I am > getting a list of topics from kafka and passing it. > > Is there a way to filter out such topics? Also, it can happen that when the > job has started the topic might have no data for a few windows and after > that, it can get some data. This filter should be dynamic as well. > > Please share some ideas on how we can make this work. > > Community members, please share your thoughts as well on how we can achieve > this. > > Thanks and regards, > Maulik > > On Fri, Feb 28, 2020 at 3:03 PM rahul patwari <[email protected] > <mailto:[email protected]>> wrote: > Hi Maulik, > > This seems like an issue with Watermark. > According to > https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L240 > > <https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L240>, > > If there are multiple partitions (or) multiple topics, Watermark will be > calculated for each of the partition and the minimum watermark is considered > as the current Watermark. > Assuming that no message is pushed to the topic with 0 throughput, according > to your logic for the watermark calculation, the watermark of each partition > for this topic will be BoundedWindow.TIMESTAMP_MIN_VALUE (the smallest > representable timestamp of an element - > https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/model/pipeline/src/main/proto/beam_runner_api.proto#L44 > > <https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/model/pipeline/src/main/proto/beam_runner_api.proto#L44>). > > As the result will be emitted from GroupByKey when the Watermark crosses the > window and as the watermark is BoundedWindow.TIMESTAMP_MIN_VALUE, you are not > seeing the results from GroupByKey. > > Regards, > Rahul > > On Fri, Feb 28, 2020 at 12:39 PM Maulik Soneji <[email protected] > <mailto:[email protected]>> wrote: > Observations: > If we read using KafkaIO for a list of topics where one of the topics has > zero throughputs, > and KafkaIO is followed by GroupByKey stage, then: > a. No data is output from GroupByKey stage for all the topics and not just > the zero throughput topic. > > If all topics have some throughput coming in, then it works fine and we get > some output from GroupByKey stage. > > Is this an issue? > > Points: > a. The output from GroupByKey is only when all topics have some throughput > b. This is a problem with KafkaIO + GroupByKey, for case where I have FileIO > + GroupByKey, this issue doesn't arise. GroupByKey outputs some data even if > there is no data for one of the files. > c. Not a runner issue, since I ran it with FlinkRunner and DataflowRunner > d. Even if lag is different for each topic on the list, we still get some > output from GroupByKey. > > Debugging: > While Debugging this issue I found that in split function of > KafkaUnboundedSource we create KafkaUnboundedSource where partition list is > one partition for each topic. > > I am not sure if this is some issue with watermark, since watermark for the > topic with no throughput will not advance. But this looks like the most > likely cause to me. > > Please help me in figuring out whether this is an issue or if there is > something wrong with my pipeline. > > Attaching detailed pipeline information for more details: > > Context: > I am currently using KafkaIO to read data from kafka for a list of topics > with a custom timestamp policy. > > Below is how I am constructing KafkaIO reader: > return KafkaIO.<byte[], byte[]>read() > .withBootstrapServers(brokers) > .withTopics(topics) > .withKeyDeserializer(ByteArrayDeserializer.class) > .withValueDeserializer(ByteArrayDeserializer.class) > .withTimestampPolicyFactory((partition, previousWatermark) -> new > EventTimestampPolicy(godataService, previousWatermark)) > .commitOffsetsInFinalize(); > Pipeline Information: > Pipeline Consists of six steps: > a. Read From Kafka with custom timestamp policy > b. Convert KafkaRecord to Message object > c. Window based on FixedWindow of 10 minutes triggering AfterWatermark > d. PCollection<Message> to PCollection<KV<String, Message>> where Topic is Key > e. GroupByKey.create() to get PCollection<KV<String, Iterable<Message>> > f. PCollection<KV<String, Iterable<Message>> to PCollection<ComputedMetrics> > for each topic > g. Write output to kafka > > Detailed Pipeline Information > a. Read data from kafka to get KafkaRecord<byte[], byte[]> > Here I am using my own timestamp policy which looks like below: > public EventTimestampPolicy(MyService myService, Optional<Instant> > previousWatermark) { > this.myService = myService; > this.currentWatermark = > previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE); > } > > @Override > public Instant getTimestampForRecord(PartitionContext context, > KafkaRecord<byte[], byte[]> record) { > Instant eventTimestamp; > try { > eventTimestamp = Deserializer.getEventTimestamp(record, myService); > } catch (InvalidProtocolBufferException e) { > statsClient.increment("io.proto.buffer.exception"); > throw new RuntimeException(e); > } > this.currentWatermark = eventTimestamp; > return this.currentWatermark; > } > > @Override > public Instant getWatermark(PartitionContext ctx) { > return this.currentWatermark; > } > Event timestamp is one of the fields in the kafka message. It is the time > when the event was pushed to kafka. > b. DoFn to transform KafkaRecord<byte[], byte[]> to Message class. > The Message class contains properties like offset, topic, partition, offset > and timestamp > c. Windowing on 10 minute fixed window triggering at > AfterWatermark.pastEndOfWindow() > d. PCollection<Message> to PCollection<KV<String, Message>> > Here Key is the kafka topic. > > e. GroupByKey to get PCollection<KV<String, Iterable<Message>> > f. PCollection<KV<String, Iterable<Message>> to PCollection<ComputedMetrics> > for each topic > g. Write output to kafka
