Using side inputs is fine and is a common pattern. You should take a look at "slowly changing side inputs"[1] as there is some example code there.
1: https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-side-input-using-windowing On Mon, Jun 1, 2020 at 8:27 PM Mohil Khare <mo...@prosimo.io> wrote: > Thanks Luke for your reply. > I see. I am trying to recall why I added allowedLateness as 360 days. > Anyways I will try without that. > > But do you think the approach I am using to keep getting a running score > in a sliding window and then using it as a side input to decorate the main > log is correct ? Or I can achieve same thing is a much better and > optimized way. > > Thanks again > Mohil > > On Mon, Jun 1, 2020 at 3:38 PM Luke Cwik <lc...@google.com> wrote: > >> Your allowed lateness is 360 days and since the trigger you have doesn't >> emit speculative results, you'll have to wait till the watermark advances >> to the end of windows timestamp + 360 days before something is output from >> the grouping aggregation/available at the side input. >> >> >> On Sat, May 30, 2020 at 12:17 PM Mohil Khare <mo...@prosimo.io> wrote: >> >>> Hello all, >>> >>> Any suggestions? Where am I going wrong or is there any better way of >>> achieving this so that I can do replay as well ? >>> >>> Thanks >>> Mohil >>> >>> On Wed, May 27, 2020 at 11:40 AM Mohil Khare <mo...@prosimo.io> wrote: >>> >>>> Hi everyone, >>>> I need a suggestion regarding usage of the side input pattern and >>>> sliding window, especially while replaying old kafka logs/offsets. >>>> >>>> FYI: I am running beam 2.19 on google dataflow. >>>> >>>> I have a use case where I read a continuous stream of data from Kafka >>>> and need to calculate one score (apart from other calculations) per key >>>> which is based on the number of such requests that are received per key in >>>> the last one hour. >>>> >>>> Roughly my code looks like following: >>>> >>>> PCollection<POJO> = p >>>> .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read() >>>> .withBootstrapServers(String.join(",", bootstrapServerToConnectTo)) >>>> .withTopic("app_access_stats") >>>> .withKeyDeserializer(StringDeserializer.class) >>>> .withValueDeserializer(ByteArrayDeserializer.class) >>>> .withConsumerConfigUpdates(kafkaConsumerProperties) >>>> .withConsumerFactoryFn(consumerFactoryObj) >>>> .commitOffsetsInFinalize()) >>>> .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, >>>> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10))) >>>> >>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)))) >>>> .withAllowedLateness(Duration.standardDays(380)) >>>> .discardingFiredPanes()) >>>> .apply("Convert_KafkaRecord_To_PCollection<POJO>", >>>> ParDo.of(new ParseKafkaLogs())); >>>> >>>> >>>> /*** Class that handles incoming PCollection<POJO> and calculate score ***/ >>>> >>>> /**. Assume input = incoming PCollection<POJO> as created above >>>> >>>> PCollectionView<Map<Key, Long>> slidingWindowHourlyUserRequestsPerKeyView >>>> >>>> = input.apply("Calculate_Total_UserRequests_Past_1Hr", new >>>> WindowedNumUserRequestsPerKey()).apply(View.asMap()); >>>> >>>> /**Calculate Running sum of num of reqs in sliding window >>>> >>>> Starting sliding window of duration 1 hr every 1 sec so that we can >>>> get accurate result of past 1 hr >>>> >>>> **/ >>>> >>>> >>>> private static class WindowedNumUserRequestsPerKey extends >>>> PTransform<PCollection<POJO>, PCollection<KV<KEY, Long>>> { >>>> >>>> @Override >>>> public PCollection<KV<KEY, Long>> expand(PCollection<POJO> input) { >>>> >>>> return input >>>> .apply("Applying_Sliding_Window_1Hr_Every1sec", >>>> Window.<POJO>into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1))) >>>> >>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes()) >>>> .apply("Grouping_per_Key", ParDo.of(new >>>> GroupByAggregationKey())) >>>> .apply("Total_Requests_Per_Key", Combine.perKey(new >>>> CalculateTotalUserRequestsPerKey())); >>>> } >>>> >>>> private static class GroupByAggregationKey extends DoFn<POJO, KV<KEY, >>>> POJO>> { >>>> @ProcessElement >>>> public void processElement(@Element POJO input, >>>> OutputReceiver<KV<KEY, POJO>> out) { >>>> /** code that emits required KV ****/ >>>> >>>> } >>>> } >>>> >>>> private static class CalculateTotalUserRequestsPerKey extends >>>> Combine.CombineFn<POJO, >>>> CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> { >>>> private static class TotalRequestsAccumulator implements >>>> Serializable { >>>> private long num_requests_running_sum = 0; >>>> >>>> TotalRequestsAccumulator(long num_requests_running_sum) { >>>> this.num_requests_running_sum = num_requests_running_sum; >>>> } >>>> >>>> @Override >>>> public boolean equals(Object o) { >>>> if (this == o) return true; >>>> if (!(o instanceof TotalRequestsAccumulator)) return false; >>>> TotalRequestsAccumulator that = (TotalRequestsAccumulator) >>>> o; >>>> return num_requests_running_sum == >>>> that.num_requests_running_sum; >>>> } >>>> >>>> @Override >>>> public int hashCode() { >>>> return Objects.hash(num_requests_running_sum); >>>> } >>>> } >>>> >>>> @Override >>>> public TotalRequestsAccumulator createAccumulator() { >>>> return new TotalRequestsAccumulator(0); >>>> } >>>> >>>> @Override >>>> public TotalRequestsAccumulator addInput(TotalRequestsAccumulator >>>> mutableAccumulator, POJO input) { >>>> mutableAccumulator.num_requests_running_sum++; >>>> return mutableAccumulator; >>>> } >>>> >>>> @Override >>>> public TotalRequestsAccumulator >>>> mergeAccumulators(Iterable<TotalRequestsAccumulator> accumulators) { >>>> TotalRequestsAccumulator merged = createAccumulator(); >>>> for (TotalRequestsAccumulator accumulator : accumulators) { >>>> merged.num_requests_running_sum += >>>> accumulator.num_requests_running_sum; >>>> } >>>> return merged; >>>> } >>>> >>>> @Override >>>> public Long extractOutput(TotalRequestsAccumulator accumulator) { >>>> Long totalUserRequestsPerKey = >>>> accumulator.num_requests_running_sum; >>>> return totalUserRequestsPerKey; >>>> } >>>> } >>>> } >>>> >>>> Now I calculate the score in the incoming POJO by using >>>> slidingWindowHourlyUserRequestsPerKeyView as side input. >>>> >>>> input.apply("Add_Score", ParDo.of(new AddScore()) >>>> .withSideInputs(slidingWindowHourlyUserRequestsPerKeyView)); >>>> >>>> >>>> Above seems to be working fine, though I need a suggestion if there is >>>> a better way of achieving this? >>>> >>>> Also, I start getting problems when we have to stop the beam for a >>>> couple of hours for maintenance or some other issue while data is >>>> continuously being pumped in kafka. >>>> >>>> Problem: >>>> When the beam resumes after a couple of hours, suddenly the above >>>> sliding window gets bombarded with log messages and instead of honoring >>>> log's timestamp, it just treats all the log messages received in the beam's >>>> sliding window, thereby giving the wrong score. For eg, if the beam was >>>> stopped between 9 am and 11 am and there was 20 msgs between 9-10 am and >>>> 30 msgs between 10-11 am, beam on resuming at 11, will consider the total >>>> 50 msgs received in the last one hour as side input while processing all >>>> log messages between 9am and 11 am. >>>> >>>> To overcome this, I tried a few things , but every approach failed: >>>> >>>> *1. In the transformation which read message from kafka and create >>>> PCollection<POJO>, I tried outWithTimestamp(event timestamp), but it didn't >>>> work as I believe you can't output data with older timestamp in a live >>>> window while reading real time data stream from kafka.* >>>> >>>> *2. I thought, since the stage that add score is not honoring event's >>>> timestamp (as evident by printing window.startTime in DoFn), I added custom >>>> timestamp policy while reading logs from kafka i.e something like this:* >>>> * KafkaIO.read.withTimestampPolicyFactory((tp, previousWatermark) -> >>>> new CustomFieldTimePolicy(previousWatermark))* >>>> >>>> *where CustomFieldTimePolicy set time record timestamp based on >>>> received record's timestamp.* >>>> * On doing this, through the window.startTime printed a somewhat >>>> accurate time which was close to even't timestamp, however, >>>> "WindowedNumUserRequestsPerKey" Transformation didn't emit any output. It >>>> just stalled. My print statements were showing up in >>>> aforementioned GroupByAggregationKey(), but then no output was emitted as >>>> if the pipeline was stuck at that stage. I couldn't find any log in GCP's >>>> stackdriver indicating the reason for the stalled pipeline.* >>>> >>>> *Any help/suggestion for solving this case. This will be very useful in >>>> our replay jobs where for some reason our data sink such as elastic search >>>> gets corrupted and we want to read again all the old kafka offsets and >>>> recreate the data in the new ES cluster.* >>>> >>>> >>>> Thanks and Regards >>>> Mohil >>>> >>>> >>>> >>>>