Re: Need suggestion/help for use case (usage of the side input pattern and sliding window)
Cool ..Thanks again for you help/suggestions Luke. Regards Mohil On Tue, Jun 2, 2020 at 10:42 AM Luke Cwik wrote: > Using side inputs is fine and is a common pattern. You should take a look > at "slowly changing side inputs"[1] as there is some example code there. > > 1: > https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-side-input-using-windowing > > On Mon, Jun 1, 2020 at 8:27 PM Mohil Khare wrote: > >> Thanks Luke for your reply. >> I see. I am trying to recall why I added allowedLateness as 360 days. >> Anyways I will try without that. >> >> But do you think the approach I am using to keep getting a running score >> in a sliding window and then using it as a side input to decorate the main >> log is correct ? Or I can achieve same thing is a much better and >> optimized way. >> >> Thanks again >> Mohil >> >> On Mon, Jun 1, 2020 at 3:38 PM Luke Cwik wrote: >> >>> Your allowed lateness is 360 days and since the trigger you have doesn't >>> emit speculative results, you'll have to wait till the watermark advances >>> to the end of windows timestamp + 360 days before something is output from >>> the grouping aggregation/available at the side input. >>> >>> >>> On Sat, May 30, 2020 at 12:17 PM Mohil Khare wrote: >>> Hello all, Any suggestions? Where am I going wrong or is there any better way of achieving this so that I can do replay as well ? Thanks Mohil On Wed, May 27, 2020 at 11:40 AM Mohil Khare wrote: > Hi everyone, > I need a suggestion regarding usage of the side input pattern and > sliding window, especially while replaying old kafka logs/offsets. > > FYI: I am running beam 2.19 on google dataflow. > > I have a use case where I read a continuous stream of data from Kafka > and need to calculate one score (apart from other calculations) per key > which is based on the number of such requests that are received per key in > the last one hour. > > Roughly my code looks like following: > > PCollection = p > .apply("Read__Logs_From_Kafka", KafkaIO.read() > .withBootstrapServers(String.join(",", > bootstrapServerToConnectTo)) > .withTopic("app_access_stats") > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(ByteArrayDeserializer.class) > .withConsumerConfigUpdates(kafkaConsumerProperties) > .withConsumerFactoryFn(consumerFactoryObj) > .commitOffsetsInFinalize()) > .apply("Applying_Fixed_Window_Logs", Window. byte[]>>into(FixedWindows.of(Duration.standardSeconds(10))) > > .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1 > .withAllowedLateness(Duration.standardDays(380)) > .discardingFiredPanes()) > .apply("Convert_KafkaRecord_To_PCollection", > ParDo.of(new ParseKafkaLogs())); > > > /*** Class that handles incoming PCollection and calculate score > ***/ > > /**. Assumeinput = incoming PCollection as created above > > PCollectionView> slidingWindowHourlyUserRequestsPerKeyView > >= input.apply("Calculate_Total_UserRequests_Past_1Hr", new > WindowedNumUserRequestsPerKey()).apply(View.asMap()); > > /**Calculate Running sum of num of reqs in sliding window > > Starting sliding window of duration 1 hr every 1 sec so that we can > get accurate result of past 1 hr > > **/ > > > private static class WindowedNumUserRequestsPerKey extends > PTransform, PCollection>> { > > @Override > public PCollection> expand(PCollection input) { > > return input > .apply("Applying_Sliding_Window_1Hr_Every1sec", > Window.into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1))) > > .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes()) > .apply("Grouping_per_Key", ParDo.of(new > GroupByAggregationKey())) > .apply("Total_Requests_Per_Key", Combine.perKey(new > CalculateTotalUserRequestsPerKey())); > } > > private static class GroupByAggregationKey extends DoFn POJO>> { > @ProcessElement > public void processElement(@Element POJO input, > OutputReceiver> out) { > /** code that emits required KV / > > } > } > > private static class CalculateTotalUserRequestsPerKey extends > Combine.CombineFn CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> { > private static class TotalRequestsAccumulator implements > Serializable { > private long
Re: Need suggestion/help for use case (usage of the side input pattern and sliding window)
Using side inputs is fine and is a common pattern. You should take a look at "slowly changing side inputs"[1] as there is some example code there. 1: https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-side-input-using-windowing On Mon, Jun 1, 2020 at 8:27 PM Mohil Khare wrote: > Thanks Luke for your reply. > I see. I am trying to recall why I added allowedLateness as 360 days. > Anyways I will try without that. > > But do you think the approach I am using to keep getting a running score > in a sliding window and then using it as a side input to decorate the main > log is correct ? Or I can achieve same thing is a much better and > optimized way. > > Thanks again > Mohil > > On Mon, Jun 1, 2020 at 3:38 PM Luke Cwik wrote: > >> Your allowed lateness is 360 days and since the trigger you have doesn't >> emit speculative results, you'll have to wait till the watermark advances >> to the end of windows timestamp + 360 days before something is output from >> the grouping aggregation/available at the side input. >> >> >> On Sat, May 30, 2020 at 12:17 PM Mohil Khare wrote: >> >>> Hello all, >>> >>> Any suggestions? Where am I going wrong or is there any better way of >>> achieving this so that I can do replay as well ? >>> >>> Thanks >>> Mohil >>> >>> On Wed, May 27, 2020 at 11:40 AM Mohil Khare wrote: >>> Hi everyone, I need a suggestion regarding usage of the side input pattern and sliding window, especially while replaying old kafka logs/offsets. FYI: I am running beam 2.19 on google dataflow. I have a use case where I read a continuous stream of data from Kafka and need to calculate one score (apart from other calculations) per key which is based on the number of such requests that are received per key in the last one hour. Roughly my code looks like following: PCollection = p .apply("Read__Logs_From_Kafka", KafkaIO.read() .withBootstrapServers(String.join(",", bootstrapServerToConnectTo)) .withTopic("app_access_stats") .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(ByteArrayDeserializer.class) .withConsumerConfigUpdates(kafkaConsumerProperties) .withConsumerFactoryFn(consumerFactoryObj) .commitOffsetsInFinalize()) .apply("Applying_Fixed_Window_Logs", Window.>>> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10))) .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1 .withAllowedLateness(Duration.standardDays(380)) .discardingFiredPanes()) .apply("Convert_KafkaRecord_To_PCollection", ParDo.of(new ParseKafkaLogs())); /*** Class that handles incoming PCollection and calculate score ***/ /**. Assumeinput = incoming PCollection as created above PCollectionView> slidingWindowHourlyUserRequestsPerKeyView = input.apply("Calculate_Total_UserRequests_Past_1Hr", new WindowedNumUserRequestsPerKey()).apply(View.asMap()); /**Calculate Running sum of num of reqs in sliding window Starting sliding window of duration 1 hr every 1 sec so that we can get accurate result of past 1 hr **/ private static class WindowedNumUserRequestsPerKey extends PTransform, PCollection>> { @Override public PCollection> expand(PCollection input) { return input .apply("Applying_Sliding_Window_1Hr_Every1sec", Window.into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1))) .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes()) .apply("Grouping_per_Key", ParDo.of(new GroupByAggregationKey())) .apply("Total_Requests_Per_Key", Combine.perKey(new CalculateTotalUserRequestsPerKey())); } private static class GroupByAggregationKey extends DoFn>>> POJO>> { @ProcessElement public void processElement(@Element POJO input, OutputReceiver> out) { /** code that emits required KV / } } private static class CalculateTotalUserRequestsPerKey extends Combine.CombineFn>>> CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> { private static class TotalRequestsAccumulator implements Serializable { private long num_requests_running_sum = 0; TotalRequestsAccumulator(long num_requests_running_sum) { this.num_requests_running_sum = num_requests_running_sum; } @Override public
Re: Need suggestion/help for use case (usage of the side input pattern and sliding window)
Thanks Luke for your reply. I see. I am trying to recall why I added allowedLateness as 360 days. Anyways I will try without that. But do you think the approach I am using to keep getting a running score in a sliding window and then using it as a side input to decorate the main log is correct ? Or I can achieve same thing is a much better and optimized way. Thanks again Mohil On Mon, Jun 1, 2020 at 3:38 PM Luke Cwik wrote: > Your allowed lateness is 360 days and since the trigger you have doesn't > emit speculative results, you'll have to wait till the watermark advances > to the end of windows timestamp + 360 days before something is output from > the grouping aggregation/available at the side input. > > > On Sat, May 30, 2020 at 12:17 PM Mohil Khare wrote: > >> Hello all, >> >> Any suggestions? Where am I going wrong or is there any better way of >> achieving this so that I can do replay as well ? >> >> Thanks >> Mohil >> >> On Wed, May 27, 2020 at 11:40 AM Mohil Khare wrote: >> >>> Hi everyone, >>> I need a suggestion regarding usage of the side input pattern and >>> sliding window, especially while replaying old kafka logs/offsets. >>> >>> FYI: I am running beam 2.19 on google dataflow. >>> >>> I have a use case where I read a continuous stream of data from Kafka >>> and need to calculate one score (apart from other calculations) per key >>> which is based on the number of such requests that are received per key in >>> the last one hour. >>> >>> Roughly my code looks like following: >>> >>> PCollection = p >>> .apply("Read__Logs_From_Kafka", KafkaIO.read() >>> .withBootstrapServers(String.join(",", bootstrapServerToConnectTo)) >>> .withTopic("app_access_stats") >>> .withKeyDeserializer(StringDeserializer.class) >>> .withValueDeserializer(ByteArrayDeserializer.class) >>> .withConsumerConfigUpdates(kafkaConsumerProperties) >>> .withConsumerFactoryFn(consumerFactoryObj) >>> .commitOffsetsInFinalize()) >>> .apply("Applying_Fixed_Window_Logs", Window.>> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10))) >>> >>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1 >>> .withAllowedLateness(Duration.standardDays(380)) >>> .discardingFiredPanes()) >>> .apply("Convert_KafkaRecord_To_PCollection", >>> ParDo.of(new ParseKafkaLogs())); >>> >>> >>> /*** Class that handles incoming PCollection and calculate score ***/ >>> >>> /**. Assumeinput = incoming PCollection as created above >>> >>> PCollectionView> slidingWindowHourlyUserRequestsPerKeyView >>> >>>= input.apply("Calculate_Total_UserRequests_Past_1Hr", new >>> WindowedNumUserRequestsPerKey()).apply(View.asMap()); >>> >>> /**Calculate Running sum of num of reqs in sliding window >>> >>> Starting sliding window of duration 1 hr every 1 sec so that we can get >>> accurate result of past 1 hr >>> >>> **/ >>> >>> >>> private static class WindowedNumUserRequestsPerKey extends >>> PTransform, PCollection>> { >>> >>> @Override >>> public PCollection> expand(PCollection input) { >>> >>> return input >>> .apply("Applying_Sliding_Window_1Hr_Every1sec", >>> Window.into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1))) >>> >>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes()) >>> .apply("Grouping_per_Key", ParDo.of(new >>> GroupByAggregationKey())) >>> .apply("Total_Requests_Per_Key", Combine.perKey(new >>> CalculateTotalUserRequestsPerKey())); >>> } >>> >>> private static class GroupByAggregationKey extends DoFn>> POJO>> { >>> @ProcessElement >>> public void processElement(@Element POJO input, >>> OutputReceiver> out) { >>> /** code that emits required KV / >>> >>> } >>> } >>> >>> private static class CalculateTotalUserRequestsPerKey extends >>> Combine.CombineFn>> CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> { >>> private static class TotalRequestsAccumulator implements >>> Serializable { >>> private long num_requests_running_sum = 0; >>> >>> TotalRequestsAccumulator(long num_requests_running_sum) { >>> this.num_requests_running_sum = num_requests_running_sum; >>> } >>> >>> @Override >>> public boolean equals(Object o) { >>> if (this == o) return true; >>> if (!(o instanceof TotalRequestsAccumulator)) return false; >>> TotalRequestsAccumulator that = (TotalRequestsAccumulator) >>> o; >>> return num_requests_running_sum == >>> that.num_requests_running_sum; >>> } >>> >>> @Override >>> public int hashCode() { >>>
Re: Need suggestion/help for use case (usage of the side input pattern and sliding window)
Your allowed lateness is 360 days and since the trigger you have doesn't emit speculative results, you'll have to wait till the watermark advances to the end of windows timestamp + 360 days before something is output from the grouping aggregation/available at the side input. On Sat, May 30, 2020 at 12:17 PM Mohil Khare wrote: > Hello all, > > Any suggestions? Where am I going wrong or is there any better way of > achieving this so that I can do replay as well ? > > Thanks > Mohil > > On Wed, May 27, 2020 at 11:40 AM Mohil Khare wrote: > >> Hi everyone, >> I need a suggestion regarding usage of the side input pattern and sliding >> window, especially while replaying old kafka logs/offsets. >> >> FYI: I am running beam 2.19 on google dataflow. >> >> I have a use case where I read a continuous stream of data from Kafka and >> need to calculate one score (apart from other calculations) per key which >> is based on the number of such requests that are received per key in the >> last one hour. >> >> Roughly my code looks like following: >> >> PCollection = p >> .apply("Read__Logs_From_Kafka", KafkaIO.read() >> .withBootstrapServers(String.join(",", bootstrapServerToConnectTo)) >> .withTopic("app_access_stats") >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializer(ByteArrayDeserializer.class) >> .withConsumerConfigUpdates(kafkaConsumerProperties) >> .withConsumerFactoryFn(consumerFactoryObj) >> .commitOffsetsInFinalize()) >> .apply("Applying_Fixed_Window_Logs", Window.> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10))) >> >> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1 >> .withAllowedLateness(Duration.standardDays(380)) >> .discardingFiredPanes()) >> .apply("Convert_KafkaRecord_To_PCollection", >> ParDo.of(new ParseKafkaLogs())); >> >> >> /*** Class that handles incoming PCollection and calculate score ***/ >> >> /**. Assumeinput = incoming PCollection as created above >> >> PCollectionView> slidingWindowHourlyUserRequestsPerKeyView >> >>= input.apply("Calculate_Total_UserRequests_Past_1Hr", new >> WindowedNumUserRequestsPerKey()).apply(View.asMap()); >> >> /**Calculate Running sum of num of reqs in sliding window >> >> Starting sliding window of duration 1 hr every 1 sec so that we can get >> accurate result of past 1 hr >> >> **/ >> >> >> private static class WindowedNumUserRequestsPerKey extends >> PTransform, PCollection>> { >> >> @Override >> public PCollection> expand(PCollection input) { >> >> return input >> .apply("Applying_Sliding_Window_1Hr_Every1sec", >> Window.into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1))) >> >> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes()) >> .apply("Grouping_per_Key", ParDo.of(new GroupByAggregationKey())) >> .apply("Total_Requests_Per_Key", Combine.perKey(new >> CalculateTotalUserRequestsPerKey())); >> } >> >> private static class GroupByAggregationKey extends DoFn> POJO>> { >> @ProcessElement >> public void processElement(@Element POJO input, >> OutputReceiver> out) { >> /** code that emits required KV / >> >> } >> } >> >> private static class CalculateTotalUserRequestsPerKey extends >> Combine.CombineFn> CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> { >> private static class TotalRequestsAccumulator implements >> Serializable { >> private long num_requests_running_sum = 0; >> >> TotalRequestsAccumulator(long num_requests_running_sum) { >> this.num_requests_running_sum = num_requests_running_sum; >> } >> >> @Override >> public boolean equals(Object o) { >> if (this == o) return true; >> if (!(o instanceof TotalRequestsAccumulator)) return false; >> TotalRequestsAccumulator that = (TotalRequestsAccumulator) o; >> return num_requests_running_sum == >> that.num_requests_running_sum; >> } >> >> @Override >> public int hashCode() { >> return Objects.hash(num_requests_running_sum); >> } >> } >> >> @Override >> public TotalRequestsAccumulator createAccumulator() { >> return new TotalRequestsAccumulator(0); >> } >> >> @Override >> public TotalRequestsAccumulator addInput(TotalRequestsAccumulator >> mutableAccumulator, POJO input) { >> mutableAccumulator.num_requests_running_sum++; >> return mutableAccumulator; >> } >> >> @Override >> public TotalRequestsAccumulator >>
Re: Need suggestion/help for use case (usage of the side input pattern and sliding window)
Hello all, Any suggestions? Where am I going wrong or is there any better way of achieving this so that I can do replay as well ? Thanks Mohil On Wed, May 27, 2020 at 11:40 AM Mohil Khare wrote: > Hi everyone, > I need a suggestion regarding usage of the side input pattern and sliding > window, especially while replaying old kafka logs/offsets. > > FYI: I am running beam 2.19 on google dataflow. > > I have a use case where I read a continuous stream of data from Kafka and > need to calculate one score (apart from other calculations) per key which > is based on the number of such requests that are received per key in the > last one hour. > > Roughly my code looks like following: > > PCollection = p > .apply("Read__Logs_From_Kafka", KafkaIO.read() > .withBootstrapServers(String.join(",", bootstrapServerToConnectTo)) > .withTopic("app_access_stats") > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(ByteArrayDeserializer.class) > .withConsumerConfigUpdates(kafkaConsumerProperties) > .withConsumerFactoryFn(consumerFactoryObj) > .commitOffsetsInFinalize()) > .apply("Applying_Fixed_Window_Logs", Window. byte[]>>into(FixedWindows.of(Duration.standardSeconds(10))) > > .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1 > .withAllowedLateness(Duration.standardDays(380)) > .discardingFiredPanes()) > .apply("Convert_KafkaRecord_To_PCollection", > ParDo.of(new ParseKafkaLogs())); > > > /*** Class that handles incoming PCollection and calculate score ***/ > > /**. Assumeinput = incoming PCollection as created above > > PCollectionView> slidingWindowHourlyUserRequestsPerKeyView > >= input.apply("Calculate_Total_UserRequests_Past_1Hr", new > WindowedNumUserRequestsPerKey()).apply(View.asMap()); > > /**Calculate Running sum of num of reqs in sliding window > > Starting sliding window of duration 1 hr every 1 sec so that we can get > accurate result of past 1 hr > > **/ > > > private static class WindowedNumUserRequestsPerKey extends > PTransform, PCollection>> { > > @Override > public PCollection> expand(PCollection input) { > > return input > .apply("Applying_Sliding_Window_1Hr_Every1sec", > Window.into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1))) > > .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes()) > .apply("Grouping_per_Key", ParDo.of(new GroupByAggregationKey())) > .apply("Total_Requests_Per_Key", Combine.perKey(new > CalculateTotalUserRequestsPerKey())); > } > > private static class GroupByAggregationKey extends DoFn POJO>> { > @ProcessElement > public void processElement(@Element POJO input, > OutputReceiver> out) { > /** code that emits required KV / > > } > } > > private static class CalculateTotalUserRequestsPerKey extends > Combine.CombineFn CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> { > private static class TotalRequestsAccumulator implements Serializable > { > private long num_requests_running_sum = 0; > > TotalRequestsAccumulator(long num_requests_running_sum) { > this.num_requests_running_sum = num_requests_running_sum; > } > > @Override > public boolean equals(Object o) { > if (this == o) return true; > if (!(o instanceof TotalRequestsAccumulator)) return false; > TotalRequestsAccumulator that = (TotalRequestsAccumulator) o; > return num_requests_running_sum == > that.num_requests_running_sum; > } > > @Override > public int hashCode() { > return Objects.hash(num_requests_running_sum); > } > } > > @Override > public TotalRequestsAccumulator createAccumulator() { > return new TotalRequestsAccumulator(0); > } > > @Override > public TotalRequestsAccumulator addInput(TotalRequestsAccumulator > mutableAccumulator, POJO input) { > mutableAccumulator.num_requests_running_sum++; > return mutableAccumulator; > } > > @Override > public TotalRequestsAccumulator > mergeAccumulators(Iterable accumulators) { > TotalRequestsAccumulator merged = createAccumulator(); > for (TotalRequestsAccumulator accumulator : accumulators) { > merged.num_requests_running_sum += > accumulator.num_requests_running_sum; > } > return merged; > } > > @Override > public Long extractOutput(TotalRequestsAccumulator accumulator) { > Long
Need suggestion/help for use case (usage of the side input pattern and sliding window)
Hi everyone, I need a suggestion regarding usage of the side input pattern and sliding window, especially while replaying old kafka logs/offsets. FYI: I am running beam 2.19 on google dataflow. I have a use case where I read a continuous stream of data from Kafka and need to calculate one score (apart from other calculations) per key which is based on the number of such requests that are received per key in the last one hour. Roughly my code looks like following: PCollection = p .apply("Read__Logs_From_Kafka", KafkaIO.read() .withBootstrapServers(String.join(",", bootstrapServerToConnectTo)) .withTopic("app_access_stats") .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(ByteArrayDeserializer.class) .withConsumerConfigUpdates(kafkaConsumerProperties) .withConsumerFactoryFn(consumerFactoryObj) .commitOffsetsInFinalize()) .apply("Applying_Fixed_Window_Logs", Window.>into(FixedWindows.of(Duration.standardSeconds(10))) .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1 .withAllowedLateness(Duration.standardDays(380)) .discardingFiredPanes()) .apply("Convert_KafkaRecord_To_PCollection", ParDo.of(new ParseKafkaLogs())); /*** Class that handles incoming PCollection and calculate score ***/ /**. Assumeinput = incoming PCollection as created above PCollectionView> slidingWindowHourlyUserRequestsPerKeyView = input.apply("Calculate_Total_UserRequests_Past_1Hr", new WindowedNumUserRequestsPerKey()).apply(View.asMap()); /**Calculate Running sum of num of reqs in sliding window Starting sliding window of duration 1 hr every 1 sec so that we can get accurate result of past 1 hr **/ private static class WindowedNumUserRequestsPerKey extends PTransform, PCollection>> { @Override public PCollection> expand(PCollection input) { return input .apply("Applying_Sliding_Window_1Hr_Every1sec", Window.into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1))) .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes()) .apply("Grouping_per_Key", ParDo.of(new GroupByAggregationKey())) .apply("Total_Requests_Per_Key", Combine.perKey(new CalculateTotalUserRequestsPerKey())); } private static class GroupByAggregationKey extends DoFn> { @ProcessElement public void processElement(@Element POJO input, OutputReceiver> out) { /** code that emits required KV / } } private static class CalculateTotalUserRequestsPerKey extends Combine.CombineFn { private static class TotalRequestsAccumulator implements Serializable { private long num_requests_running_sum = 0; TotalRequestsAccumulator(long num_requests_running_sum) { this.num_requests_running_sum = num_requests_running_sum; } @Override public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof TotalRequestsAccumulator)) return false; TotalRequestsAccumulator that = (TotalRequestsAccumulator) o; return num_requests_running_sum == that.num_requests_running_sum; } @Override public int hashCode() { return Objects.hash(num_requests_running_sum); } } @Override public TotalRequestsAccumulator createAccumulator() { return new TotalRequestsAccumulator(0); } @Override public TotalRequestsAccumulator addInput(TotalRequestsAccumulator mutableAccumulator, POJO input) { mutableAccumulator.num_requests_running_sum++; return mutableAccumulator; } @Override public TotalRequestsAccumulator mergeAccumulators(Iterable accumulators) { TotalRequestsAccumulator merged = createAccumulator(); for (TotalRequestsAccumulator accumulator : accumulators) { merged.num_requests_running_sum += accumulator.num_requests_running_sum; } return merged; } @Override public Long extractOutput(TotalRequestsAccumulator accumulator) { Long totalUserRequestsPerKey = accumulator.num_requests_running_sum; return totalUserRequestsPerKey; } } } Now I calculate the score in the incoming POJO by using slidingWindowHourlyUserRequestsPerKeyView as side input. input.apply("Add_Score", ParDo.of(new AddScore()) .withSideInputs(slidingWindowHourlyUserRequestsPerKeyView)); Above seems to be working fine, though I need a suggestion if there is a better way of achieving this? Also, I start getting problems when we have to stop the beam for a couple of hours