Re: Need suggestion/help for use case (usage of the side input pattern and sliding window)

2020-06-03 Thread Mohil Khare
Cool ..Thanks again for you help/suggestions Luke.

Regards
Mohil

On Tue, Jun 2, 2020 at 10:42 AM Luke Cwik  wrote:

> Using side inputs is fine and is a common pattern. You should take a look
> at "slowly changing side inputs"[1] as there is some example code there.
>
> 1:
> https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-side-input-using-windowing
>
> On Mon, Jun 1, 2020 at 8:27 PM Mohil Khare  wrote:
>
>> Thanks Luke for your reply.
>> I see. I am trying to recall why I added allowedLateness as 360 days.
>> Anyways I will try without that.
>>
>> But do you think the approach I am using to keep getting a running score
>> in a sliding window and then using it as a side input to decorate the main
>> log  is correct ? Or I can achieve same thing is a much better and
>> optimized way.
>>
>> Thanks again
>> Mohil
>>
>> On Mon, Jun 1, 2020 at 3:38 PM Luke Cwik  wrote:
>>
>>> Your allowed lateness is 360 days and since the trigger you have doesn't
>>> emit speculative results, you'll have to wait till the watermark advances
>>> to the end of windows timestamp + 360 days before something is output from
>>> the grouping aggregation/available at the side input.
>>>
>>>
>>> On Sat, May 30, 2020 at 12:17 PM Mohil Khare  wrote:
>>>
 Hello all,

 Any suggestions? Where am I going wrong or is there any better way of
 achieving this so that I can do replay as well ?

 Thanks
 Mohil

 On Wed, May 27, 2020 at 11:40 AM Mohil Khare  wrote:

> Hi everyone,
> I need a suggestion regarding usage of the side input pattern and
> sliding window, especially while replaying old kafka logs/offsets.
>
> FYI: I am running beam 2.19 on google dataflow.
>
> I have a use case where I read a continuous stream of data from Kafka
> and need to calculate one score (apart from other calculations) per key
> which is based on the number of such requests that are received per key in
> the last one hour.
>
> Roughly my code looks like following:
>
> PCollection = p
> .apply("Read__Logs_From_Kafka", KafkaIO.read()
> .withBootstrapServers(String.join(",", 
> bootstrapServerToConnectTo))
> .withTopic("app_access_stats")
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(ByteArrayDeserializer.class)
> .withConsumerConfigUpdates(kafkaConsumerProperties)
> .withConsumerFactoryFn(consumerFactoryObj)
> .commitOffsetsInFinalize())
> .apply("Applying_Fixed_Window_Logs", Window. byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
> 
> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1
> .withAllowedLateness(Duration.standardDays(380))
> .discardingFiredPanes())
> .apply("Convert_KafkaRecord_To_PCollection",
> ParDo.of(new ParseKafkaLogs()));
>
>
> /*** Class that handles incoming PCollection and calculate score 
> ***/
>
> /**. Assumeinput = incoming PCollection as created above
>
> PCollectionView> slidingWindowHourlyUserRequestsPerKeyView
>
>= input.apply("Calculate_Total_UserRequests_Past_1Hr", new 
> WindowedNumUserRequestsPerKey()).apply(View.asMap());
>
> /**Calculate Running sum of num of reqs in sliding window
>
> Starting sliding window of duration 1 hr every 1 sec so that we can 
> get accurate result of past 1 hr
>
> **/
>
>
> private static class WindowedNumUserRequestsPerKey extends 
> PTransform, PCollection>> {
>
> @Override
> public PCollection> expand(PCollection input) {
>
> return input
> .apply("Applying_Sliding_Window_1Hr_Every1sec", 
> Window.into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))
> 
> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
> .apply("Grouping_per_Key", ParDo.of(new 
> GroupByAggregationKey()))
> .apply("Total_Requests_Per_Key", Combine.perKey(new 
> CalculateTotalUserRequestsPerKey()));
> }
>
> private static class GroupByAggregationKey extends DoFn POJO>> {
> @ProcessElement
> public void processElement(@Element POJO input, 
> OutputReceiver> out) {
> /** code that emits required KV /
>
> }
> }
>
> private static class CalculateTotalUserRequestsPerKey extends 
> Combine.CombineFn CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> {
> private static class TotalRequestsAccumulator implements 
> Serializable {
> private long 

Re: Need suggestion/help for use case (usage of the side input pattern and sliding window)

2020-06-02 Thread Luke Cwik
Using side inputs is fine and is a common pattern. You should take a look
at "slowly changing side inputs"[1] as there is some example code there.

1:
https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-side-input-using-windowing

On Mon, Jun 1, 2020 at 8:27 PM Mohil Khare  wrote:

> Thanks Luke for your reply.
> I see. I am trying to recall why I added allowedLateness as 360 days.
> Anyways I will try without that.
>
> But do you think the approach I am using to keep getting a running score
> in a sliding window and then using it as a side input to decorate the main
> log  is correct ? Or I can achieve same thing is a much better and
> optimized way.
>
> Thanks again
> Mohil
>
> On Mon, Jun 1, 2020 at 3:38 PM Luke Cwik  wrote:
>
>> Your allowed lateness is 360 days and since the trigger you have doesn't
>> emit speculative results, you'll have to wait till the watermark advances
>> to the end of windows timestamp + 360 days before something is output from
>> the grouping aggregation/available at the side input.
>>
>>
>> On Sat, May 30, 2020 at 12:17 PM Mohil Khare  wrote:
>>
>>> Hello all,
>>>
>>> Any suggestions? Where am I going wrong or is there any better way of
>>> achieving this so that I can do replay as well ?
>>>
>>> Thanks
>>> Mohil
>>>
>>> On Wed, May 27, 2020 at 11:40 AM Mohil Khare  wrote:
>>>
 Hi everyone,
 I need a suggestion regarding usage of the side input pattern and
 sliding window, especially while replaying old kafka logs/offsets.

 FYI: I am running beam 2.19 on google dataflow.

 I have a use case where I read a continuous stream of data from Kafka
 and need to calculate one score (apart from other calculations) per key
 which is based on the number of such requests that are received per key in
 the last one hour.

 Roughly my code looks like following:

 PCollection = p
 .apply("Read__Logs_From_Kafka", KafkaIO.read()
 .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))
 .withTopic("app_access_stats")
 .withKeyDeserializer(StringDeserializer.class)
 .withValueDeserializer(ByteArrayDeserializer.class)
 .withConsumerConfigUpdates(kafkaConsumerProperties)
 .withConsumerFactoryFn(consumerFactoryObj)
 .commitOffsetsInFinalize())
 .apply("Applying_Fixed_Window_Logs", Window.>>> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
 
 .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1
 .withAllowedLateness(Duration.standardDays(380))
 .discardingFiredPanes())
 .apply("Convert_KafkaRecord_To_PCollection",
 ParDo.of(new ParseKafkaLogs()));


 /*** Class that handles incoming PCollection and calculate score ***/

 /**. Assumeinput = incoming PCollection as created above

 PCollectionView> slidingWindowHourlyUserRequestsPerKeyView

= input.apply("Calculate_Total_UserRequests_Past_1Hr", new 
 WindowedNumUserRequestsPerKey()).apply(View.asMap());

 /**Calculate Running sum of num of reqs in sliding window

 Starting sliding window of duration 1 hr every 1 sec so that we can 
 get accurate result of past 1 hr

 **/


 private static class WindowedNumUserRequestsPerKey extends 
 PTransform, PCollection>> {

 @Override
 public PCollection> expand(PCollection input) {

 return input
 .apply("Applying_Sliding_Window_1Hr_Every1sec", 
 Window.into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))
 
 .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
 .apply("Grouping_per_Key", ParDo.of(new 
 GroupByAggregationKey()))
 .apply("Total_Requests_Per_Key", Combine.perKey(new 
 CalculateTotalUserRequestsPerKey()));
 }

 private static class GroupByAggregationKey extends DoFn>>> POJO>> {
 @ProcessElement
 public void processElement(@Element POJO input, 
 OutputReceiver> out) {
 /** code that emits required KV /

 }
 }

 private static class CalculateTotalUserRequestsPerKey extends 
 Combine.CombineFn>>> CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> {
 private static class TotalRequestsAccumulator implements 
 Serializable {
 private long num_requests_running_sum = 0;

 TotalRequestsAccumulator(long num_requests_running_sum) {
 this.num_requests_running_sum = num_requests_running_sum;
 }

 @Override
 public 

Re: Need suggestion/help for use case (usage of the side input pattern and sliding window)

2020-06-01 Thread Mohil Khare
Thanks Luke for your reply.
I see. I am trying to recall why I added allowedLateness as 360 days.
Anyways I will try without that.

But do you think the approach I am using to keep getting a running score in
a sliding window and then using it as a side input to decorate the main
log  is correct ? Or I can achieve same thing is a much better and
optimized way.

Thanks again
Mohil

On Mon, Jun 1, 2020 at 3:38 PM Luke Cwik  wrote:

> Your allowed lateness is 360 days and since the trigger you have doesn't
> emit speculative results, you'll have to wait till the watermark advances
> to the end of windows timestamp + 360 days before something is output from
> the grouping aggregation/available at the side input.
>
>
> On Sat, May 30, 2020 at 12:17 PM Mohil Khare  wrote:
>
>> Hello all,
>>
>> Any suggestions? Where am I going wrong or is there any better way of
>> achieving this so that I can do replay as well ?
>>
>> Thanks
>> Mohil
>>
>> On Wed, May 27, 2020 at 11:40 AM Mohil Khare  wrote:
>>
>>> Hi everyone,
>>> I need a suggestion regarding usage of the side input pattern and
>>> sliding window, especially while replaying old kafka logs/offsets.
>>>
>>> FYI: I am running beam 2.19 on google dataflow.
>>>
>>> I have a use case where I read a continuous stream of data from Kafka
>>> and need to calculate one score (apart from other calculations) per key
>>> which is based on the number of such requests that are received per key in
>>> the last one hour.
>>>
>>> Roughly my code looks like following:
>>>
>>> PCollection = p
>>> .apply("Read__Logs_From_Kafka", KafkaIO.read()
>>> .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))
>>> .withTopic("app_access_stats")
>>> .withKeyDeserializer(StringDeserializer.class)
>>> .withValueDeserializer(ByteArrayDeserializer.class)
>>> .withConsumerConfigUpdates(kafkaConsumerProperties)
>>> .withConsumerFactoryFn(consumerFactoryObj)
>>> .commitOffsetsInFinalize())
>>> .apply("Applying_Fixed_Window_Logs", Window.>> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>>> 
>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1
>>> .withAllowedLateness(Duration.standardDays(380))
>>> .discardingFiredPanes())
>>> .apply("Convert_KafkaRecord_To_PCollection",
>>> ParDo.of(new ParseKafkaLogs()));
>>>
>>>
>>> /*** Class that handles incoming PCollection and calculate score ***/
>>>
>>> /**. Assumeinput = incoming PCollection as created above
>>>
>>> PCollectionView> slidingWindowHourlyUserRequestsPerKeyView
>>>
>>>= input.apply("Calculate_Total_UserRequests_Past_1Hr", new 
>>> WindowedNumUserRequestsPerKey()).apply(View.asMap());
>>>
>>> /**Calculate Running sum of num of reqs in sliding window
>>>
>>> Starting sliding window of duration 1 hr every 1 sec so that we can get 
>>> accurate result of past 1 hr
>>>
>>> **/
>>>
>>>
>>> private static class WindowedNumUserRequestsPerKey extends 
>>> PTransform, PCollection>> {
>>>
>>> @Override
>>> public PCollection> expand(PCollection input) {
>>>
>>> return input
>>> .apply("Applying_Sliding_Window_1Hr_Every1sec", 
>>> Window.into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))
>>> 
>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
>>> .apply("Grouping_per_Key", ParDo.of(new 
>>> GroupByAggregationKey()))
>>> .apply("Total_Requests_Per_Key", Combine.perKey(new 
>>> CalculateTotalUserRequestsPerKey()));
>>> }
>>>
>>> private static class GroupByAggregationKey extends DoFn>> POJO>> {
>>> @ProcessElement
>>> public void processElement(@Element POJO input, 
>>> OutputReceiver> out) {
>>> /** code that emits required KV /
>>>
>>> }
>>> }
>>>
>>> private static class CalculateTotalUserRequestsPerKey extends 
>>> Combine.CombineFn>> CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> {
>>> private static class TotalRequestsAccumulator implements 
>>> Serializable {
>>> private long num_requests_running_sum = 0;
>>>
>>> TotalRequestsAccumulator(long num_requests_running_sum) {
>>> this.num_requests_running_sum = num_requests_running_sum;
>>> }
>>>
>>> @Override
>>> public boolean equals(Object o) {
>>> if (this == o) return true;
>>> if (!(o instanceof TotalRequestsAccumulator)) return false;
>>> TotalRequestsAccumulator that = (TotalRequestsAccumulator) 
>>> o;
>>> return num_requests_running_sum == 
>>> that.num_requests_running_sum;
>>> }
>>>
>>> @Override
>>> public int hashCode() {
>>> 

Re: Need suggestion/help for use case (usage of the side input pattern and sliding window)

2020-06-01 Thread Luke Cwik
Your allowed lateness is 360 days and since the trigger you have doesn't
emit speculative results, you'll have to wait till the watermark advances
to the end of windows timestamp + 360 days before something is output from
the grouping aggregation/available at the side input.


On Sat, May 30, 2020 at 12:17 PM Mohil Khare  wrote:

> Hello all,
>
> Any suggestions? Where am I going wrong or is there any better way of
> achieving this so that I can do replay as well ?
>
> Thanks
> Mohil
>
> On Wed, May 27, 2020 at 11:40 AM Mohil Khare  wrote:
>
>> Hi everyone,
>> I need a suggestion regarding usage of the side input pattern and sliding
>> window, especially while replaying old kafka logs/offsets.
>>
>> FYI: I am running beam 2.19 on google dataflow.
>>
>> I have a use case where I read a continuous stream of data from Kafka and
>> need to calculate one score (apart from other calculations) per key  which
>> is based on the number of such requests that are received per key in the
>> last one hour.
>>
>> Roughly my code looks like following:
>>
>> PCollection = p
>> .apply("Read__Logs_From_Kafka", KafkaIO.read()
>> .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))
>> .withTopic("app_access_stats")
>> .withKeyDeserializer(StringDeserializer.class)
>> .withValueDeserializer(ByteArrayDeserializer.class)
>> .withConsumerConfigUpdates(kafkaConsumerProperties)
>> .withConsumerFactoryFn(consumerFactoryObj)
>> .commitOffsetsInFinalize())
>> .apply("Applying_Fixed_Window_Logs", Window.> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>> 
>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1
>> .withAllowedLateness(Duration.standardDays(380))
>> .discardingFiredPanes())
>> .apply("Convert_KafkaRecord_To_PCollection",
>> ParDo.of(new ParseKafkaLogs()));
>>
>>
>> /*** Class that handles incoming PCollection and calculate score ***/
>>
>> /**. Assumeinput = incoming PCollection as created above
>>
>> PCollectionView> slidingWindowHourlyUserRequestsPerKeyView
>>
>>= input.apply("Calculate_Total_UserRequests_Past_1Hr", new 
>> WindowedNumUserRequestsPerKey()).apply(View.asMap());
>>
>> /**Calculate Running sum of num of reqs in sliding window
>>
>> Starting sliding window of duration 1 hr every 1 sec so that we can get 
>> accurate result of past 1 hr
>>
>> **/
>>
>>
>> private static class WindowedNumUserRequestsPerKey extends 
>> PTransform, PCollection>> {
>>
>> @Override
>> public PCollection> expand(PCollection input) {
>>
>> return input
>> .apply("Applying_Sliding_Window_1Hr_Every1sec", 
>> Window.into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))
>> 
>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
>> .apply("Grouping_per_Key", ParDo.of(new GroupByAggregationKey()))
>> .apply("Total_Requests_Per_Key", Combine.perKey(new 
>> CalculateTotalUserRequestsPerKey()));
>> }
>>
>> private static class GroupByAggregationKey extends DoFn> POJO>> {
>> @ProcessElement
>> public void processElement(@Element POJO input, 
>> OutputReceiver> out) {
>> /** code that emits required KV /
>>
>> }
>> }
>>
>> private static class CalculateTotalUserRequestsPerKey extends 
>> Combine.CombineFn> CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> {
>> private static class TotalRequestsAccumulator implements 
>> Serializable {
>> private long num_requests_running_sum = 0;
>>
>> TotalRequestsAccumulator(long num_requests_running_sum) {
>> this.num_requests_running_sum = num_requests_running_sum;
>> }
>>
>> @Override
>> public boolean equals(Object o) {
>> if (this == o) return true;
>> if (!(o instanceof TotalRequestsAccumulator)) return false;
>> TotalRequestsAccumulator that = (TotalRequestsAccumulator) o;
>> return num_requests_running_sum == 
>> that.num_requests_running_sum;
>> }
>>
>> @Override
>> public int hashCode() {
>> return Objects.hash(num_requests_running_sum);
>> }
>> }
>>
>> @Override
>> public TotalRequestsAccumulator createAccumulator() {
>> return new TotalRequestsAccumulator(0);
>> }
>>
>> @Override
>> public TotalRequestsAccumulator addInput(TotalRequestsAccumulator 
>> mutableAccumulator, POJO input) {
>> mutableAccumulator.num_requests_running_sum++;
>> return mutableAccumulator;
>> }
>>
>> @Override
>> public TotalRequestsAccumulator 
>> 

Re: Need suggestion/help for use case (usage of the side input pattern and sliding window)

2020-05-30 Thread Mohil Khare
Hello all,

Any suggestions? Where am I going wrong or is there any better way of
achieving this so that I can do replay as well ?

Thanks
Mohil

On Wed, May 27, 2020 at 11:40 AM Mohil Khare  wrote:

> Hi everyone,
> I need a suggestion regarding usage of the side input pattern and sliding
> window, especially while replaying old kafka logs/offsets.
>
> FYI: I am running beam 2.19 on google dataflow.
>
> I have a use case where I read a continuous stream of data from Kafka and
> need to calculate one score (apart from other calculations) per key  which
> is based on the number of such requests that are received per key in the
> last one hour.
>
> Roughly my code looks like following:
>
> PCollection = p
> .apply("Read__Logs_From_Kafka", KafkaIO.read()
> .withBootstrapServers(String.join(",", bootstrapServerToConnectTo))
> .withTopic("app_access_stats")
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(ByteArrayDeserializer.class)
> .withConsumerConfigUpdates(kafkaConsumerProperties)
> .withConsumerFactoryFn(consumerFactoryObj)
> .commitOffsetsInFinalize())
> .apply("Applying_Fixed_Window_Logs", Window. byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
> 
> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1
> .withAllowedLateness(Duration.standardDays(380))
> .discardingFiredPanes())
> .apply("Convert_KafkaRecord_To_PCollection",
> ParDo.of(new ParseKafkaLogs()));
>
>
> /*** Class that handles incoming PCollection and calculate score ***/
>
> /**. Assumeinput = incoming PCollection as created above
>
> PCollectionView> slidingWindowHourlyUserRequestsPerKeyView
>
>= input.apply("Calculate_Total_UserRequests_Past_1Hr", new 
> WindowedNumUserRequestsPerKey()).apply(View.asMap());
>
> /**Calculate Running sum of num of reqs in sliding window
>
> Starting sliding window of duration 1 hr every 1 sec so that we can get 
> accurate result of past 1 hr
>
> **/
>
>
> private static class WindowedNumUserRequestsPerKey extends 
> PTransform, PCollection>> {
>
> @Override
> public PCollection> expand(PCollection input) {
>
> return input
> .apply("Applying_Sliding_Window_1Hr_Every1sec", 
> Window.into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))
> 
> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
> .apply("Grouping_per_Key", ParDo.of(new GroupByAggregationKey()))
> .apply("Total_Requests_Per_Key", Combine.perKey(new 
> CalculateTotalUserRequestsPerKey()));
> }
>
> private static class GroupByAggregationKey extends DoFn POJO>> {
> @ProcessElement
> public void processElement(@Element POJO input, 
> OutputReceiver> out) {
> /** code that emits required KV /
>
> }
> }
>
> private static class CalculateTotalUserRequestsPerKey extends 
> Combine.CombineFn CalculateTotalUserRequestsPerKey.TotalRequestsAccumulator, Long> {
> private static class TotalRequestsAccumulator implements Serializable 
> {
> private long num_requests_running_sum = 0;
>
> TotalRequestsAccumulator(long num_requests_running_sum) {
> this.num_requests_running_sum = num_requests_running_sum;
> }
>
> @Override
> public boolean equals(Object o) {
> if (this == o) return true;
> if (!(o instanceof TotalRequestsAccumulator)) return false;
> TotalRequestsAccumulator that = (TotalRequestsAccumulator) o;
> return num_requests_running_sum == 
> that.num_requests_running_sum;
> }
>
> @Override
> public int hashCode() {
> return Objects.hash(num_requests_running_sum);
> }
> }
>
> @Override
> public TotalRequestsAccumulator createAccumulator() {
> return new TotalRequestsAccumulator(0);
> }
>
> @Override
> public TotalRequestsAccumulator addInput(TotalRequestsAccumulator 
> mutableAccumulator, POJO input) {
> mutableAccumulator.num_requests_running_sum++;
> return mutableAccumulator;
> }
>
> @Override
> public TotalRequestsAccumulator 
> mergeAccumulators(Iterable accumulators) {
> TotalRequestsAccumulator merged = createAccumulator();
> for (TotalRequestsAccumulator accumulator : accumulators) {
> merged.num_requests_running_sum += 
> accumulator.num_requests_running_sum;
> }
> return merged;
> }
>
> @Override
> public Long extractOutput(TotalRequestsAccumulator accumulator) {
> Long 

Need suggestion/help for use case (usage of the side input pattern and sliding window)

2020-05-27 Thread Mohil Khare
Hi everyone,
I need a suggestion regarding usage of the side input pattern and sliding
window, especially while replaying old kafka logs/offsets.

FYI: I am running beam 2.19 on google dataflow.

I have a use case where I read a continuous stream of data from Kafka and
need to calculate one score (apart from other calculations) per key  which
is based on the number of such requests that are received per key in the
last one hour.

Roughly my code looks like following:

PCollection = p
.apply("Read__Logs_From_Kafka", KafkaIO.read()
.withBootstrapServers(String.join(",", bootstrapServerToConnectTo))
.withTopic("app_access_stats")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class)
.withConsumerConfigUpdates(kafkaConsumerProperties)
.withConsumerFactoryFn(consumerFactoryObj)
.commitOffsetsInFinalize())
.apply("Applying_Fixed_Window_Logs", Window.>into(FixedWindows.of(Duration.standardSeconds(10)))

.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1
.withAllowedLateness(Duration.standardDays(380))
.discardingFiredPanes())
.apply("Convert_KafkaRecord_To_PCollection",
ParDo.of(new ParseKafkaLogs()));


/*** Class that handles incoming PCollection and calculate score ***/

/**. Assumeinput = incoming PCollection as created above

PCollectionView> slidingWindowHourlyUserRequestsPerKeyView

   = input.apply("Calculate_Total_UserRequests_Past_1Hr", new
WindowedNumUserRequestsPerKey()).apply(View.asMap());

/**Calculate Running sum of num of reqs in sliding window

Starting sliding window of duration 1 hr every 1 sec so that we
can get accurate result of past 1 hr

**/


private static class WindowedNumUserRequestsPerKey extends
PTransform, PCollection>> {

@Override
public PCollection> expand(PCollection input) {

return input
.apply("Applying_Sliding_Window_1Hr_Every1sec",
Window.into(SlidingWindows.of(Duration.standardHours(1)).every(Duration.standardSeconds(1)))

.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardDays(360)).discardingFiredPanes())
.apply("Grouping_per_Key", ParDo.of(new GroupByAggregationKey()))
.apply("Total_Requests_Per_Key", Combine.perKey(new
CalculateTotalUserRequestsPerKey()));
}

private static class GroupByAggregationKey extends DoFn> {
@ProcessElement
public void processElement(@Element POJO input,
OutputReceiver> out) {
/** code that emits required KV /

}
}

private static class CalculateTotalUserRequestsPerKey extends
Combine.CombineFn {
private static class TotalRequestsAccumulator implements Serializable {
private long num_requests_running_sum = 0;

TotalRequestsAccumulator(long num_requests_running_sum) {
this.num_requests_running_sum = num_requests_running_sum;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof TotalRequestsAccumulator)) return false;
TotalRequestsAccumulator that = (TotalRequestsAccumulator) o;
return num_requests_running_sum ==
that.num_requests_running_sum;
}

@Override
public int hashCode() {
return Objects.hash(num_requests_running_sum);
}
}

@Override
public TotalRequestsAccumulator createAccumulator() {
return new TotalRequestsAccumulator(0);
}

@Override
public TotalRequestsAccumulator
addInput(TotalRequestsAccumulator mutableAccumulator, POJO input) {
mutableAccumulator.num_requests_running_sum++;
return mutableAccumulator;
}

@Override
public TotalRequestsAccumulator
mergeAccumulators(Iterable accumulators) {
TotalRequestsAccumulator merged = createAccumulator();
for (TotalRequestsAccumulator accumulator : accumulators) {
merged.num_requests_running_sum +=
accumulator.num_requests_running_sum;
}
return merged;
}

@Override
public Long extractOutput(TotalRequestsAccumulator accumulator) {
Long totalUserRequestsPerKey = accumulator.num_requests_running_sum;
return totalUserRequestsPerKey;
}
}
}

Now I calculate the score in the incoming POJO by using
slidingWindowHourlyUserRequestsPerKeyView as side input.

input.apply("Add_Score", ParDo.of(new AddScore())
.withSideInputs(slidingWindowHourlyUserRequestsPerKeyView));


Above seems to be working fine, though I need a suggestion if there is a
better way of achieving this?

Also, I start getting problems when we have to stop the beam for a couple
of hours