Using side inputs is fine and is a common pattern. You should take a look
at "slowly changing side inputs"[1] as there is some example code there.

1:
https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-side-input-using-windowing

On Mon, Jun 1, 2020 at 8:27 PM Mohil Khare <mo...@prosimo.io> wrote:

> Thanks Luke for your reply.
> I see. I am trying to recall why I added allowedLateness as 360 days.
> Anyways I will try without that.
>
> But do you think the approach I am using to keep getting a running score
> in a sliding window and then using it as a side input to decorate the main
> log  is correct ? Or I can achieve same thing is a much better and
> optimized way.
>
> Thanks again
> Mohil
>
> On Mon, Jun 1, 2020 at 3:38 PM Luke Cwik <lc...@google.com> wrote:
>
>> Your allowed lateness is 360 days and since the trigger you have doesn't
>> emit speculative results, you'll have to wait till the watermark advances
>> to the end of windows timestamp + 360 days before something is output from
>> the grouping aggregation/available at the side input.
>>
>>
>> On Sat, May 30, 2020 at 12:17 PM Mohil Khare <mo...@prosimo.io> wrote:
>>
>>> Hello all,
>>>
>>> Any suggestions? Where am I going wrong or is there any better way of
>>> achieving this so that I can do replay as well ?
>>>
>>> Thanks
>>> Mohil
>>>
>>> On Wed, May 27, 2020 at 11:40 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>
>>>> Hi everyone,
>>>> I need a suggestion regarding usage of the side input pattern and
>>>> sliding window, especially while replaying old kafka logs/offsets.
>>>>
>>>> FYI: I am running beam 2.19 on google dataflow.
>>>>
>>>> I have a use case where I read a continuous stream of data from Kafka
>>>> and need to calculate one score (apart from other calculations) per key
>>>> which is based on the number of such requests that are received per key in
>>>> the last one hour.
>>>>
>>>> Roughly my code looks like following:
>>>>
>>>> PCollection<POJO> = p
>>>>     .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read()
>>>>         .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))
>>>>         .withTopic("app_access_stats")
>>>>         .withKeyDeserializer(StringDeserializer.class)
>>>>         .withValueDeserializer(ByteArrayDeserializer.class)
>>>>         .withConsumerConfigUpdates(kafkaConsumerProperties)
>>>>         .withConsumerFactoryFn(consumerFactoryObj)
>>>>         .commitOffsetsInFinalize())
>>>>     .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, 
>>>> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>>>>         
>>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))
>>>>         .withAllowedLateness(Duration.standardDays(380))
>>>>         .discardingFiredPanes())
>>>>     .apply("Convert_KafkaRecord_To_PCollection<POJO>",
>>>>         ParDo.of(new ParseKafkaLogs()));
>>>>
>>>>
>>>> /*** Class that handles incoming PCollection<POJO> and calculate score ***/
>>>>
>>>> /**. Assume    input = incoming PCollection<POJO> as created above
>>>>
>>>> PCollectionView<Map<Key, Long>> slidingWindowHourlyUserRequestsPerKeyView
>>>>
>>>>    = input.apply("Calculate_Total_UserRequests_Past_1Hr", new 
>>>> WindowedNumUserRequestsPerKey()).apply(View.asMap());
>>>>
>>>> /**Calculate Running sum of num of reqs in sliding window
>>>>
>>>>     Starting sliding window of duration 1 hr every 1 sec so that we can 
>>>> get accurate result of past 1 hr
>>>>
>>>> **/
>>>>
>>>>
>>>> private static class WindowedNumUserRequestsPerKey extends 
>>>> PTransform<PCollection<POJO>, PCollection<KV<KEY, Long>>> {
>>>>
>>>>     @Override
>>>>     public PCollection<KV<KEY, Long>> expand(PCollection<POJO> input) {
>>>>
>>>>         return input
>>>>             .apply("Applying_Sliding_Window_1Hr_Every1sec", 
>>>> Window.<POJO>into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))
>>>>                 
>>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
>>>>             .apply("Grouping_per_Key", ParDo.of(new 
>>>> GroupByAggregationKey()))
>>>>             .apply("Total_Requests_Per_Key", Combine.perKey(new 
>>>> CalculateTotalUserRequestsPerKey()));
>>>>     }
>>>>
>>>>     private static class GroupByAggregationKey extends DoFn<POJO, KV<KEY, 
>>>> POJO>> {
>>>>         @ProcessElement
>>>>         public void processElement(@Element POJO input, 
>>>> OutputReceiver<KV<KEY, POJO>> out) {
>>>>             /** code that emits required KV ****/
>>>>
>>>>         }
>>>>     }
>>>>
>>>>     private static class CalculateTotalUserRequestsPerKey extends 
>>>> Combine.CombineFn<POJO, 
>>>> CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> {
>>>>         private static class TotalRequestsAccumulator implements 
>>>> Serializable {
>>>>             private long num_requests_running_sum = 0;
>>>>
>>>>             TotalRequestsAccumulator(long num_requests_running_sum) {
>>>>                 this.num_requests_running_sum = num_requests_running_sum;
>>>>             }
>>>>
>>>>             @Override
>>>>             public boolean equals(Object o) {
>>>>                 if (this == o) return true;
>>>>                 if (!(o instanceof TotalRequestsAccumulator)) return false;
>>>>                 TotalRequestsAccumulator that = (TotalRequestsAccumulator) 
>>>> o;
>>>>                 return num_requests_running_sum == 
>>>> that.num_requests_running_sum;
>>>>             }
>>>>
>>>>             @Override
>>>>             public int hashCode() {
>>>>                 return Objects.hash(num_requests_running_sum);
>>>>             }
>>>>         }
>>>>
>>>>         @Override
>>>>         public TotalRequestsAccumulator createAccumulator() {
>>>>             return new TotalRequestsAccumulator(0);
>>>>         }
>>>>
>>>>         @Override
>>>>         public TotalRequestsAccumulator addInput(TotalRequestsAccumulator 
>>>> mutableAccumulator, POJO input) {
>>>>             mutableAccumulator.num_requests_running_sum++;
>>>>             return mutableAccumulator;
>>>>         }
>>>>
>>>>         @Override
>>>>         public TotalRequestsAccumulator 
>>>> mergeAccumulators(Iterable<TotalRequestsAccumulator> accumulators) {
>>>>             TotalRequestsAccumulator merged = createAccumulator();
>>>>             for (TotalRequestsAccumulator accumulator : accumulators) {
>>>>                 merged.num_requests_running_sum += 
>>>> accumulator.num_requests_running_sum;
>>>>             }
>>>>             return merged;
>>>>         }
>>>>
>>>>         @Override
>>>>         public Long extractOutput(TotalRequestsAccumulator accumulator) {
>>>>             Long totalUserRequestsPerKey = 
>>>> accumulator.num_requests_running_sum;
>>>>             return totalUserRequestsPerKey;
>>>>         }
>>>>     }
>>>> }
>>>>
>>>> Now I calculate the score in the incoming POJO by using
>>>> slidingWindowHourlyUserRequestsPerKeyView as side input.
>>>>
>>>> input.apply("Add_Score", ParDo.of(new AddScore())
>>>>     .withSideInputs(slidingWindowHourlyUserRequestsPerKeyView));
>>>>
>>>>
>>>> Above seems to be working fine, though I need a suggestion if there is
>>>> a better way of achieving this?
>>>>
>>>> Also, I start getting problems when we have to stop the beam for a
>>>> couple of hours for maintenance or some other issue while data is
>>>> continuously being pumped in kafka.
>>>>
>>>> Problem:
>>>> When the beam resumes after a couple of hours, suddenly the above
>>>> sliding window gets bombarded with log messages and instead of honoring
>>>> log's timestamp, it just treats all the log messages received in the beam's
>>>> sliding window, thereby giving the wrong score. For eg, if the beam was
>>>> stopped  between 9 am and 11 am and there was 20 msgs between 9-10 am and
>>>> 30 msgs between 10-11 am, beam on resuming at 11, will consider the total
>>>> 50 msgs received in the last one hour as side input while processing all
>>>> log messages between 9am and 11 am.
>>>>
>>>> To overcome this, I tried a few things , but every approach failed:
>>>>
>>>> *1. In the transformation which read message from kafka and create
>>>> PCollection<POJO>, I tried outWithTimestamp(event timestamp), but it didn't
>>>> work as I believe you can't output data with older timestamp in a live
>>>> window while reading real time data stream from kafka.*
>>>>
>>>> *2. I thought, since the stage that add score is not honoring event's
>>>> timestamp (as evident by printing window.startTime in DoFn), I added custom
>>>> timestamp policy while reading logs from kafka i.e something like this:*
>>>> *    KafkaIO.read.withTimestampPolicyFactory((tp, previousWatermark) ->
>>>> new CustomFieldTimePolicy(previousWatermark))*
>>>>
>>>> *where CustomFieldTimePolicy set time record timestamp based on
>>>> received record's timestamp.*
>>>> *  On doing this, through the window.startTime printed a somewhat
>>>> accurate time which was close to even't timestamp, however,
>>>> "WindowedNumUserRequestsPerKey" Transformation didn't emit any output. It
>>>> just stalled. My print statements were showing up in
>>>> aforementioned GroupByAggregationKey(), but then no output was emitted as
>>>> if the pipeline was stuck at that stage. I couldn't find any log in GCP's
>>>> stackdriver indicating the reason for the stalled pipeline.*
>>>>
>>>> *Any help/suggestion for solving this case. This will be very useful in
>>>> our replay jobs where for some reason our data sink such as elastic search
>>>> gets corrupted and we want to read again all the old kafka offsets and
>>>> recreate the data in the new ES cluster.*
>>>>
>>>>
>>>> Thanks and Regards
>>>> Mohil
>>>>
>>>>
>>>>
>>>>

Reply via email to