Re: Session Windows - not working as expected

2021-05-06 Thread Swagat Mishra
I am able to maintain a list state in process function and aggregate the
values, how do i get a notification/event to remove the value from the
stored list state.

On Thu, May 6, 2021 at 8:47 PM Swagat Mishra  wrote:

> I meant "Do you recommend the state to be maintained in* Value** State *or
> external store like elastic?"
>
> On Thu, May 6, 2021 at 8:46 PM Swagat Mishra  wrote:
>
>> I want to aggregate the user activity e.g number of products the user has
>> purchased in the last 1 hour.
>>
>> so - User A (ID = USER-A)  purchases a1 product at 10:30 and another
>> product at 10:45 AM and another product at 1:30 AM.
>>
>> My API should give 2 products purchased if the API call happens at 11:29
>> AM (10:30 , 10:45) and 1 product if the API call happens at 1:45 AM
>>
>> The API will access data persisted from the flink streaming output.
>>
>> As of now I am doing keyby on (ID = USER-A) .
>>
>> Do I have to maintain my own own calculated state within the process
>> window function. Is the process window function shared across all keys or
>> one instance per key.  Do you recommend the state to be maintained in State
>> or elastic?
>>
>> Also, if I change the processing to processing time instead of event
>> time, the aggregation is happening. Any reason why flink could not provide
>> event time aggregations like the processing time aggregation.
>>
>>
>>
>> On Thu, May 6, 2021 at 7:11 PM Arvid Heise  wrote:
>>
>>> I'm not sure what you want to achieve exactly.
>>>
>>> You can always keyby the values by a constant pseudo-key such that all
>>> values will be in the same partition (so instead of using global but with
>>> the same effect). Then you can use a process function to maintain the
>>> state. Just make sure that your data volume is low enough as this part is
>>> not parallelizable by definition.
>>>
>>> On Thu, May 6, 2021 at 10:09 AM Swagat Mishra 
>>> wrote:
>>>
>>>> thank you
>>>>
>>>> i wil have a look at datasteeam.global
>>>>
>>>> is there any other way to maintain state like by using valuestate.
>>>>
>>>>
>>>> On Thu, 6 May 2021 at 1:26 PM, Arvid Heise  wrote:
>>>>
>>>>> If you keyby then all direct functions see only the elements with the
>>>>> same key. So that's the expected behavior and the base of Flink's parallel
>>>>> processing capabilities.
>>>>>
>>>>> If you want to generate a window over all customers, you have to use a
>>>>> global window. However, that also means that no parallelization can 
>>>>> happen,
>>>>> so I'd discourage that.
>>>>>
>>>>> A better way would be to perform as many calculations as possible in
>>>>> the process function (for example create a customer with buy information
>>>>> record) and then have a DataStream#global() reshuffle to collect all
>>>>> aggregated information on one node.
>>>>>
>>>>> On Thu, May 6, 2021 at 9:20 AM Swagat Mishra 
>>>>> wrote:
>>>>>
>>>>>> Thank you.
>>>>>>
>>>>>> sourceContext.collectWithTimestamp(c, c.getEventTime());
>>>>>>
>>>>>> Adding this to the source context worked.
>>>>>>
>>>>>> However I am still getting only one customer in the process method. i 
>>>>>> would expect the iterable to provide all customers in the window. or do 
>>>>>> i have to maintain state.
>>>>>>
>>>>>>
>>>>>> changes for reference:
>>>>>>
>>>>>> I made the following change, also removed anly lag that i had introduced 
>>>>>> for experimentation earlier.
>>>>>>
>>>>>> so trigger looks like:
>>>>>>
>>>>>>
>>>>>> @Override
>>>>>> public TriggerResult onElement(Customer customer, long l, TimeWindow 
>>>>>> timeWindow, TriggerContext triggerContext) throws Exception {
>>>>>> if (timeWindow.maxTimestamp() <= 
>>>>>> triggerContext.getCurrentWatermark()) {
>>>>>> // if the watermark is already past the window fire 
>>>>>> immediately
>>>>>> return TriggerResult.FIRE;
>>>>>>

Re: Session Windows - not working as expected

2021-05-06 Thread Swagat Mishra
I meant "Do you recommend the state to be maintained in* Value** State *or
external store like elastic?"

On Thu, May 6, 2021 at 8:46 PM Swagat Mishra  wrote:

> I want to aggregate the user activity e.g number of products the user has
> purchased in the last 1 hour.
>
> so - User A (ID = USER-A)  purchases a1 product at 10:30 and another
> product at 10:45 AM and another product at 1:30 AM.
>
> My API should give 2 products purchased if the API call happens at 11:29
> AM (10:30 , 10:45) and 1 product if the API call happens at 1:45 AM
>
> The API will access data persisted from the flink streaming output.
>
> As of now I am doing keyby on (ID = USER-A) .
>
> Do I have to maintain my own own calculated state within the process
> window function. Is the process window function shared across all keys or
> one instance per key.  Do you recommend the state to be maintained in State
> or elastic?
>
> Also, if I change the processing to processing time instead of event time,
> the aggregation is happening. Any reason why flink could not provide event
> time aggregations like the processing time aggregation.
>
>
>
> On Thu, May 6, 2021 at 7:11 PM Arvid Heise  wrote:
>
>> I'm not sure what you want to achieve exactly.
>>
>> You can always keyby the values by a constant pseudo-key such that all
>> values will be in the same partition (so instead of using global but with
>> the same effect). Then you can use a process function to maintain the
>> state. Just make sure that your data volume is low enough as this part is
>> not parallelizable by definition.
>>
>> On Thu, May 6, 2021 at 10:09 AM Swagat Mishra  wrote:
>>
>>> thank you
>>>
>>> i wil have a look at datasteeam.global
>>>
>>> is there any other way to maintain state like by using valuestate.
>>>
>>>
>>> On Thu, 6 May 2021 at 1:26 PM, Arvid Heise  wrote:
>>>
>>>> If you keyby then all direct functions see only the elements with the
>>>> same key. So that's the expected behavior and the base of Flink's parallel
>>>> processing capabilities.
>>>>
>>>> If you want to generate a window over all customers, you have to use a
>>>> global window. However, that also means that no parallelization can happen,
>>>> so I'd discourage that.
>>>>
>>>> A better way would be to perform as many calculations as possible in
>>>> the process function (for example create a customer with buy information
>>>> record) and then have a DataStream#global() reshuffle to collect all
>>>> aggregated information on one node.
>>>>
>>>> On Thu, May 6, 2021 at 9:20 AM Swagat Mishra 
>>>> wrote:
>>>>
>>>>> Thank you.
>>>>>
>>>>> sourceContext.collectWithTimestamp(c, c.getEventTime());
>>>>>
>>>>> Adding this to the source context worked.
>>>>>
>>>>> However I am still getting only one customer in the process method. i 
>>>>> would expect the iterable to provide all customers in the window. or do i 
>>>>> have to maintain state.
>>>>>
>>>>>
>>>>> changes for reference:
>>>>>
>>>>> I made the following change, also removed anly lag that i had introduced 
>>>>> for experimentation earlier.
>>>>>
>>>>> so trigger looks like:
>>>>>
>>>>>
>>>>> @Override
>>>>> public TriggerResult onElement(Customer customer, long l, TimeWindow 
>>>>> timeWindow, TriggerContext triggerContext) throws Exception {
>>>>> if (timeWindow.maxTimestamp() <= 
>>>>> triggerContext.getCurrentWatermark()) {
>>>>> // if the watermark is already past the window fire 
>>>>> immediately
>>>>> return TriggerResult.FIRE;
>>>>> } else {
>>>>> //LOGGER.info("Max timestamp for customer: " + 
>>>>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
>>>>> 
>>>>> triggerContext.registerEventTimeTimer(customer.getEventTime());   
>>>>>  return TriggerResult.FIRE;
>>>>>
>>>>> }
>>>>> }
>>>>>
>>>>> @Override
>>>>> public TriggerResult onEventTime(long time, TimeWindow timeWindow, 
>>>>>

Re: Session Windows - not working as expected

2021-05-06 Thread Swagat Mishra
I want to aggregate the user activity e.g number of products the user has
purchased in the last 1 hour.

so - User A (ID = USER-A)  purchases a1 product at 10:30 and another
product at 10:45 AM and another product at 1:30 AM.

My API should give 2 products purchased if the API call happens at 11:29 AM
(10:30 , 10:45) and 1 product if the API call happens at 1:45 AM

The API will access data persisted from the flink streaming output.

As of now I am doing keyby on (ID = USER-A) .

Do I have to maintain my own own calculated state within the process window
function. Is the process window function shared across all keys or one
instance per key.  Do you recommend the state to be maintained in State or
elastic?

Also, if I change the processing to processing time instead of event time,
the aggregation is happening. Any reason why flink could not provide event
time aggregations like the processing time aggregation.



On Thu, May 6, 2021 at 7:11 PM Arvid Heise  wrote:

> I'm not sure what you want to achieve exactly.
>
> You can always keyby the values by a constant pseudo-key such that all
> values will be in the same partition (so instead of using global but with
> the same effect). Then you can use a process function to maintain the
> state. Just make sure that your data volume is low enough as this part is
> not parallelizable by definition.
>
> On Thu, May 6, 2021 at 10:09 AM Swagat Mishra  wrote:
>
>> thank you
>>
>> i wil have a look at datasteeam.global
>>
>> is there any other way to maintain state like by using valuestate.
>>
>>
>> On Thu, 6 May 2021 at 1:26 PM, Arvid Heise  wrote:
>>
>>> If you keyby then all direct functions see only the elements with the
>>> same key. So that's the expected behavior and the base of Flink's parallel
>>> processing capabilities.
>>>
>>> If you want to generate a window over all customers, you have to use a
>>> global window. However, that also means that no parallelization can happen,
>>> so I'd discourage that.
>>>
>>> A better way would be to perform as many calculations as possible in the
>>> process function (for example create a customer with buy information
>>> record) and then have a DataStream#global() reshuffle to collect all
>>> aggregated information on one node.
>>>
>>> On Thu, May 6, 2021 at 9:20 AM Swagat Mishra  wrote:
>>>
>>>> Thank you.
>>>>
>>>> sourceContext.collectWithTimestamp(c, c.getEventTime());
>>>>
>>>> Adding this to the source context worked.
>>>>
>>>> However I am still getting only one customer in the process method. i 
>>>> would expect the iterable to provide all customers in the window. or do i 
>>>> have to maintain state.
>>>>
>>>>
>>>> changes for reference:
>>>>
>>>> I made the following change, also removed anly lag that i had introduced 
>>>> for experimentation earlier.
>>>>
>>>> so trigger looks like:
>>>>
>>>>
>>>> @Override
>>>> public TriggerResult onElement(Customer customer, long l, TimeWindow 
>>>> timeWindow, TriggerContext triggerContext) throws Exception {
>>>> if (timeWindow.maxTimestamp() <= 
>>>> triggerContext.getCurrentWatermark()) {
>>>> // if the watermark is already past the window fire immediately
>>>> return TriggerResult.FIRE;
>>>> } else {
>>>> //LOGGER.info("Max timestamp for customer: " + 
>>>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
>>>> 
>>>> triggerContext.registerEventTimeTimer(customer.getEventTime());
>>>> return TriggerResult.FIRE;
>>>>
>>>> }
>>>> }
>>>>
>>>> @Override
>>>> public TriggerResult onEventTime(long time, TimeWindow timeWindow, 
>>>> TriggerContext triggerContext) {
>>>> return time == timeWindow.maxTimestamp() ?
>>>> TriggerResult.FIRE :
>>>> TriggerResult.CONTINUE;
>>>> }
>>>>
>>>> @Override
>>>> public TriggerResult onProcessingTime(long time, TimeWindow window, 
>>>> TriggerContext ctx) throws Exception {
>>>> return TriggerResult.CONTINUE;
>>>> }
>>>>
>>>> @Override
>>>> public void clear(TimeWindow window, TriggerCont

Re: Session Windows - not working as expected

2021-05-06 Thread Swagat Mishra
thank you

i wil have a look at datasteeam.global

is there any other way to maintain state like by using valuestate.


On Thu, 6 May 2021 at 1:26 PM, Arvid Heise  wrote:

> If you keyby then all direct functions see only the elements with the same
> key. So that's the expected behavior and the base of Flink's parallel
> processing capabilities.
>
> If you want to generate a window over all customers, you have to use a
> global window. However, that also means that no parallelization can happen,
> so I'd discourage that.
>
> A better way would be to perform as many calculations as possible in the
> process function (for example create a customer with buy information
> record) and then have a DataStream#global() reshuffle to collect all
> aggregated information on one node.
>
> On Thu, May 6, 2021 at 9:20 AM Swagat Mishra  wrote:
>
>> Thank you.
>>
>> sourceContext.collectWithTimestamp(c, c.getEventTime());
>>
>> Adding this to the source context worked.
>>
>> However I am still getting only one customer in the process method. i would 
>> expect the iterable to provide all customers in the window. or do i have to 
>> maintain state.
>>
>>
>> changes for reference:
>>
>> I made the following change, also removed anly lag that i had introduced for 
>> experimentation earlier.
>>
>> so trigger looks like:
>>
>>
>> @Override
>> public TriggerResult onElement(Customer customer, long l, TimeWindow 
>> timeWindow, TriggerContext triggerContext) throws Exception {
>> if (timeWindow.maxTimestamp() <= 
>> triggerContext.getCurrentWatermark()) {
>> // if the watermark is already past the window fire immediately
>> return TriggerResult.FIRE;
>> } else {
>> //LOGGER.info("Max timestamp for customer: " + 
>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
>> triggerContext.registerEventTimeTimer(customer.getEventTime());  
>>   return TriggerResult.FIRE;
>>
>> }
>> }
>>
>> @Override
>> public TriggerResult onEventTime(long time, TimeWindow timeWindow, 
>> TriggerContext triggerContext) {
>> return time == timeWindow.maxTimestamp() ?
>> TriggerResult.FIRE :
>> TriggerResult.CONTINUE;
>> }
>>
>> @Override
>> public TriggerResult onProcessingTime(long time, TimeWindow window, 
>> TriggerContext ctx) throws Exception {
>> return TriggerResult.CONTINUE;
>> }
>>
>> @Override
>> public void clear(TimeWindow window, TriggerContext ctx) throws 
>> Exception {
>> ctx.deleteEventTimeTimer(window.maxTimestamp());
>> }
>>
>> @Override
>> public boolean canMerge() {
>> return true;
>> }
>>
>> and *removed latenness*
>>
>> customerStream
>>
>> 
>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
>> .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
>> .keyBy((KeySelector) Customer::getIdentifier)
>> .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
>> //.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
>> .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
>> .process(new CustomAggregateFunction());
>>
>>
>> On Thu, May 6, 2021 at 12:32 PM Arvid Heise  wrote:
>>
>>> Your source is not setting the timestamp with collectWithTimestamp. I'm
>>> assuming that nothing really moves from the event time's perspective.
>>>
>>> On Thu, May 6, 2021 at 8:58 AM Swagat Mishra  wrote:
>>>
>>>> Yes customer generator is setting the event timestamp correctly like I
>>>> see below. I debugged and found that the events are getting late, so never
>>>> executed. i.e,. in the window operator the method  this.isWindowLate(
>>>> actualWindow) is getting executed to false for the rest of the events
>>>> except the first, hence the events are getting skipped, not able to figure
>>>> out where exactly the issue is.
>>>>
>>>> i have removed evictot=r because I don't think I need it yet.
>>>>
>>>> stream looks like
>>>>
>>>> customerStream
>>>> .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
>>>> .keyBy((KeySelect

Re: Session Windows - not working as expected

2021-05-06 Thread Swagat Mishra
Thank you.

sourceContext.collectWithTimestamp(c, c.getEventTime());

Adding this to the source context worked.

However I am still getting only one customer in the process method. i
would expect the iterable to provide all customers in the window. or
do i have to maintain state.


changes for reference:

I made the following change, also removed anly lag that i had
introduced for experimentation earlier.

so trigger looks like:


@Override
public TriggerResult onElement(Customer customer, long l,
TimeWindow timeWindow, TriggerContext triggerContext) throws Exception
{
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
//LOGGER.info("Max timestamp for customer: " +
customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
triggerContext.registerEventTimeTimer(customer.getEventTime());
   return TriggerResult.FIRE;

}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow,
TriggerContext triggerContext) {
return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, TimeWindow
window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}

@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}

@Override
public boolean canMerge() {
return true;
}

and *removed latenness*

customerStream


//.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector) Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(20)))
//.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());


On Thu, May 6, 2021 at 12:32 PM Arvid Heise  wrote:

> Your source is not setting the timestamp with collectWithTimestamp. I'm
> assuming that nothing really moves from the event time's perspective.
>
> On Thu, May 6, 2021 at 8:58 AM Swagat Mishra  wrote:
>
>> Yes customer generator is setting the event timestamp correctly like I
>> see below. I debugged and found that the events are getting late, so never
>> executed. i.e,. in the window operator the method  this.isWindowLate(
>> actualWindow) is getting executed to false for the rest of the events
>> except the first, hence the events are getting skipped, not able to figure
>> out where exactly the issue is.
>>
>> i have removed evictot=r because I don't think I need it yet.
>>
>> stream looks like
>>
>> customerStream
>> .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
>> .keyBy((KeySelector) Customer::getIdentifier)
>> 
>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
>> .trigger(new EventTimeTrigger())
>> .process(new CustomAggregateFunction());
>>
>>
>> *Customer generator looks like:*
>>
>> while (isRunning) {
>> Customer c = new Customer(CUSTOMER_KEY[counter % 5],* LocalTime.now()*, 
>> 1000); // that's the event time
>> System.out.println("Writing customer: " + c);
>> sourceContext.collect(c);
>> //sourceContext.emitWatermark(new Watermark(c.getEventTime()));
>> Thread.sleep(1000);
>> counter++;
>> if(counter % 11 == 0) {
>> System.out.println("Sleeping for 10 seconds");
>> Thread.sleep(1);
>> }
>> }
>>
>>
>> Custom Watermark generator has this:
>>
>> .
>> @Override
>> public void onEvent(Customer customer, long l, WatermarkOutput 
>> watermarkOutput) {
>> currentMaxTimestamp = Math.max(currentMaxTimestamp, 
>> customer.getEventTime()  );
>> }
>>
>> @Override
>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>> watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>>
>> }
>> .
>>
>> trigger looks like:
>>
>> --
>>
>>
>>  @Override
>> public TriggerResult onElement(Customer customer, long l, TimeWindow 
>> timeWindow, TriggerContext triggerContext) throws Exception {
>> if (timeWindow.max

Re: Session Windows - not working as expected

2021-05-06 Thread Swagat Mishra
Yes customer generator is setting the event timestamp correctly like I see
below. I debugged and found that the events are getting late, so never
executed. i.e,. in the window operator the method  this.isWindowLate(
actualWindow) is getting executed to false for the rest of the events
except the first, hence the events are getting skipped, not able to figure
out where exactly the issue is.

i have removed evictot=r because I don't think I need it yet.

stream looks like

customerStream
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.keyBy((KeySelector) Customer::getIdentifier)

.window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
.trigger(new EventTimeTrigger())
.process(new CustomAggregateFunction());


*Customer generator looks like:*

while (isRunning) {
Customer c = new Customer(CUSTOMER_KEY[counter % 5],*
LocalTime.now()*, 1000); // that's the event time
System.out.println("Writing customer: " + c);
sourceContext.collect(c);
//sourceContext.emitWatermark(new Watermark(c.getEventTime()));
Thread.sleep(1000);
counter++;
if(counter % 11 == 0) {
System.out.println("Sleeping for 10 seconds");
Thread.sleep(1);
}
}


Custom Watermark generator has this:

.
@Override
public void onEvent(Customer customer, long l, WatermarkOutput
watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp,
customer.getEventTime()  );
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));

}
.

trigger looks like:

--


 @Override
public TriggerResult onElement(Customer customer, long l,
TimeWindow timeWindow, TriggerContext triggerContext) throws Exception
{
if (timeWindow.maxTimestamp() <= triggerContext.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
LOGGER.info("Max timestamp for customer: " +
customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());

triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay());
return TriggerResult.FIRE;
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow timeWindow,
TriggerContext triggerContext) {
//if (timeWindow.maxTimestamp() >
triggerContext.getCurrentWatermark()) {
//triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
//return TriggerResult.CONTINUE;
//}

return time == timeWindow.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}





On Thu, May 6, 2021 at 12:02 PM Arvid Heise  wrote:

> Hi,
>
> Is your CustomerGenerator setting the event timestamp correctly? Are your
> evictors evicting too early?
>
> You can try to add some debug output into the watermark assigner and see
> if it's indeed progressing as expected.
>
> On Thu, May 6, 2021 at 12:48 AM Swagat Mishra  wrote:
>
>> This seems to be working fine in processing time but doesn't work in
>> event time. Is there an issue with the way the water mark is defined or do
>> we need to set up timers?
>>
>> Please advise.
>>
>>
>> WORKING:
>>
>> customerStream
>> .keyBy((KeySelector) Customer::getIdentifier)
>> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
>> .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
>> .process(new CustomAggregateFunction());
>>
>>
>> NOT WORKING:
>>
>> customerStream
>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>> WaterMarkAssigner()))
>> .keyBy(Customer::getIdentifier)
>> .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>> .trigger(EventTimeTrigger.create())
>> .evictor(new CustomerEvictor())
>> .process(new CustomAggregateFunction())
>> .print();
>>
>>
>> On Thu, May 6, 2021 at 1:53 AM Sam  wrote:
>>
>>> Adding the code for CustomWatermarkGenerator
>>>
>>> .
>>> @Override
>>> public void onEvent(Customer customer, long l, WatermarkOutput 
>>> watermarkOutput) {
>>> currentMaxTimestamp = Math.max(currentMaxTimestamp, 
>>> customer.getEventTime()  );
>>> }
>>>
>>> @Override
>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>> watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>>>
>>> }
>>> .
>>>
&g

Re: Session Windows - not working as expected

2021-05-05 Thread Swagat Mishra
This seems to be working fine in processing time but doesn't work in event
time. Is there an issue with the way the water mark is defined or do we
need to set up timers?

Please advise.


WORKING:

customerStream
.keyBy((KeySelector) Customer::getIdentifier)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
.process(new CustomAggregateFunction());


NOT WORKING:

customerStream
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new
WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();


On Thu, May 6, 2021 at 1:53 AM Sam  wrote:

> Adding the code for CustomWatermarkGenerator
>
> .
> @Override
> public void onEvent(Customer customer, long l, WatermarkOutput 
> watermarkOutput) {
> currentMaxTimestamp = Math.max(currentMaxTimestamp, 
> customer.getEventTime()  );
> }
>
> @Override
> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
> watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>
> }
> .....
>
>
> On Thu, May 6, 2021 at 1:33 AM Swagat Mishra  wrote:
>
>> Hi,
>>
>> Bit of background, I have a stream of customers who have purchased some
>> product, reading these transactions on a KAFKA topic. I want to aggregate
>> the number of products the customer has purchased in a particular duration
>> ( say 10 seconds ) and write to a sink.
>>
>> I am using session windows to achieve the above.
>>
>> For test purposes, i have mocked  up a customer stream and executed
>> session windows like below.
>>
>> StreamExecutionEnvironment environment = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream customerStream = environment.addSource( new 
>> CustomerGenerator() );
>>
>> customerStream
>> 
>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>> WaterMarkAssigner()))
>> .keyBy(Customer::getIdentifier)
>> .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>> .trigger(EventTimeTrigger.create())
>> .evictor(new CustomerEvictor())
>> .process(new CustomAggregateFunction())
>> .print();
>>
>> My watermark assigner looks like:
>>
>> public class WaterMarkAssigner implements WatermarkStrategy {
>> static final Logger logger = 
>> LoggerFactory.getLogger(WaterMarkAssigner.class);
>>
>> @Override
>> public WatermarkGenerator 
>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>> return new CustomWatermarkGenerator();
>> }
>> }
>>
>> I notice that the evictor, and aggregation functions are getting called only 
>> once for the first customer in the stream.
>>
>> The data stream is generating customers at 1 seconds interval and there are 
>> 5 customer keys for which it's generating transactions.
>>
>> Am I doing something wrong with the above?
>>
>> I want to be able to capture the event on each transaction getting added and 
>> removed from the window so that I can perform the aggregation.
>>
>> please advise.
>>
>>
>>
>>
>>


Re: Flink Event specific window

2021-05-05 Thread Swagat Mishra
Hi Arvid,

I sent a separate mail titled - Session Windows - not working as expected (
to the user community )

All other details are here if you need, closing this thread.

Please have a look when you have a few minutes, much appreciated.

Regards,
Swagat

On Thu, May 6, 2021 at 1:50 AM Swagat Mishra  wrote:

> Hi Arvid,
>
> I sent a separate mail titled - Session Windows - not working as expected
>
> closing this thread.
>
> Please have a look when you have a few minutes, much appreciated.
>
> Regards,
> Swagat
>
>
> On Wed, May 5, 2021 at 7:24 PM Swagat Mishra  wrote:
>
>> Hi Arvid,
>>
>> Tried a small POC to reproduce the behaviour, somehow dont see the
>> process function getting called, am I doing something wrong?
>>
>> customerStream
>> .keyBy(Customer::getIdentifier)
>> .window(EventTimeSessionWindows.withGap(Time.seconds(8)))
>> .process(new CustomAggregateFunction())
>> .print();
>>
>> the process function looks like below
>>
>> public class CustomAggregateFunction extends ProcessWindowFunction> CustomerAggregate, String, TimeWindow> {
>>
>> @Override
>> public void process(String key, Context context, Iterable 
>> iterable, Collector collector) throws Exception {
>> System.out.println("in aggregation");
>> }
>> }
>>
>> the customer generator
>>
>> public class CustomerGenerator implements SourceFunction {
>>
>> volatile boolean isRunning = true;
>>
>> private String[] CUSTOMER_KEY = {"C1", "C2", "C3", "C4", "C5"};
>>
>> @Override
>> public void run(SourceContext sourceContext) throws Exception {
>> int counter = 1;
>>
>> while (isRunning) {
>> Customer c = new Customer(CUSTOMER_KEY[counter % 5], 
>> LocalTime.now(), 1000);
>> System.out.println("Writing customer: " + c);
>> sourceContext.collect(c);
>> Thread.sleep(1000);
>> counter++;
>> }
>> }
>>
>> @Override
>> public void cancel() {
>> isRunning = false;
>> }
>> }
>>
>>
>> Customer object
>>
>> public class Customer {
>> private String identifier;
>> private LocalTime eventTime;
>> private double amount;
>>
>> public Customer(String identifier, LocalTime eventTime, double amount) {
>> this.identifier = identifier;
>> this.amount = amount;
>> this.eventTime = eventTime;
>> }
>>
>> public String getIdentifier() {
>> return identifier;
>> }
>>
>> public LocalTime getEventTime() {
>> return eventTime;
>> }
>>
>> public double getAmount() {
>> return amount;
>> }
>>
>> @Override
>> public String toString() {
>> return "Customer{" +
>> "identifier='" + identifier + '\'' +
>> ", eventTime=" + eventTime +
>> ", amount=" + amount +
>> '}';
>> }
>> }
>>
>>
>>
>> On Thu, Apr 29, 2021 at 5:46 PM Arvid Heise  wrote:
>>
>>> Hi Swagat,
>>>
>>> 1. Where the data primarily resides depends on the chosen state backend
>>> [1]. In most cases, it's written to some file with a memory cache. It's
>>> possible to query the state [2] but not with SQL. In fact, it's so basic
>>> that we decided to drop the feature in the future to make room for a more
>>> sophisticated solution based around replicating the state to an external
>>> queryable form but there is nothing specific yet.
>>> 2. It would help if you (re)read the section about state persistence.
>>> [3] Basically, the state is updated on every write access of the process
>>> function. Flink creates a checkpoint of the state periodically and can
>>> recover from these checkpoint. It's also possible to look into these
>>> checkpoint with the state processor API [4].
>>> 3. It's embedded. See above to what happens on failure.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#state-backends
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html
>>> [3]
>>> https://ci.apa

Re: Flink Event specific window

2021-05-05 Thread Swagat Mishra
Hi Arvid,

I sent a separate mail titled - Session Windows - not working as expected

closing this thread.

Please have a look when you have a few minutes, much appreciated.

Regards,
Swagat


On Wed, May 5, 2021 at 7:24 PM Swagat Mishra  wrote:

> Hi Arvid,
>
> Tried a small POC to reproduce the behaviour, somehow dont see the process
> function getting called, am I doing something wrong?
>
> customerStream
> .keyBy(Customer::getIdentifier)
> .window(EventTimeSessionWindows.withGap(Time.seconds(8)))
> .process(new CustomAggregateFunction())
> .print();
>
> the process function looks like below
>
> public class CustomAggregateFunction extends ProcessWindowFunction CustomerAggregate, String, TimeWindow> {
>
> @Override
> public void process(String key, Context context, Iterable 
> iterable, Collector collector) throws Exception {
> System.out.println("in aggregation");
> }
> }
>
> the customer generator
>
> public class CustomerGenerator implements SourceFunction {
>
> volatile boolean isRunning = true;
>
> private String[] CUSTOMER_KEY = {"C1", "C2", "C3", "C4", "C5"};
>
> @Override
> public void run(SourceContext sourceContext) throws Exception {
> int counter = 1;
>
> while (isRunning) {
> Customer c = new Customer(CUSTOMER_KEY[counter % 5], 
> LocalTime.now(), 1000);
> System.out.println("Writing customer: " + c);
> sourceContext.collect(c);
> Thread.sleep(1000);
> counter++;
> }
> }
>
> @Override
> public void cancel() {
> isRunning = false;
> }
> }
>
>
> Customer object
>
> public class Customer {
> private String identifier;
> private LocalTime eventTime;
> private double amount;
>
> public Customer(String identifier, LocalTime eventTime, double amount) {
> this.identifier = identifier;
> this.amount = amount;
> this.eventTime = eventTime;
> }
>
> public String getIdentifier() {
> return identifier;
> }
>
> public LocalTime getEventTime() {
> return eventTime;
> }
>
> public double getAmount() {
> return amount;
> }
>
> @Override
> public String toString() {
> return "Customer{" +
> "identifier='" + identifier + '\'' +
> ", eventTime=" + eventTime +
> ", amount=" + amount +
> '}';
> }
> }
>
>
>
> On Thu, Apr 29, 2021 at 5:46 PM Arvid Heise  wrote:
>
>> Hi Swagat,
>>
>> 1. Where the data primarily resides depends on the chosen state backend
>> [1]. In most cases, it's written to some file with a memory cache. It's
>> possible to query the state [2] but not with SQL. In fact, it's so basic
>> that we decided to drop the feature in the future to make room for a more
>> sophisticated solution based around replicating the state to an external
>> queryable form but there is nothing specific yet.
>> 2. It would help if you (re)read the section about state persistence. [3]
>> Basically, the state is updated on every write access of the process
>> function. Flink creates a checkpoint of the state periodically and can
>> recover from these checkpoint. It's also possible to look into these
>> checkpoint with the state processor API [4].
>> 3. It's embedded. See above to what happens on failure.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#state-backends
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#state-persistence
>> [4]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> On Mon, Apr 26, 2021 at 10:43 AM Swagat Mishra 
>> wrote:
>>
>>> Hi Arvid,
>>>
>>> On 2 - I was referring to stateful functions as an alternative to
>>> windows, but in this particular use case, its not fitting in exactly I
>>> think, though a solution can be built around it.
>>>
>>> On the overall approach here what's the right way to use Flink SQL:
>>>
>>> Every event has the transaction time which I am using as event time to 
>>> assign WatermarkStrategy
>>> K

Session Windows - not working as expected

2021-05-05 Thread Swagat Mishra
Hi,

Bit of background, I have a stream of customers who have purchased some
product, reading these transactions on a KAFKA topic. I want to aggregate
the number of products the customer has purchased in a particular duration
( say 10 seconds ) and write to a sink.

I am using session windows to achieve the above.

For test purposes, i have mocked  up a customer stream and executed session
windows like below.

StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream customerStream = environment.addSource( new
CustomerGenerator() );

customerStream

//.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new
WaterMarkAssigner()))
.keyBy(Customer::getIdentifier)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.trigger(EventTimeTrigger.create())
.evictor(new CustomerEvictor())
.process(new CustomAggregateFunction())
.print();

My watermark assigner looks like:

public class WaterMarkAssigner implements WatermarkStrategy {
static final Logger logger =
LoggerFactory.getLogger(WaterMarkAssigner.class);

@Override
public WatermarkGenerator
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomWatermarkGenerator();
}
}

I notice that the evictor, and aggregation functions are getting
called only once for the first customer in the stream.

The data stream is generating customers at 1 seconds interval and
there are 5 customer keys for which it's generating transactions.

Am I doing something wrong with the above?

I want to be able to capture the event on each transaction getting
added and removed from the window so that I can perform the
aggregation.

please advise.


Re: Flink Event specific window

2021-04-30 Thread Swagat Mishra
Thanks Arvid.

very helpful.

On Thu, Apr 29, 2021 at 5:46 PM Arvid Heise  wrote:

> Hi Swagat,
>
> 1. Where the data primarily resides depends on the chosen state backend
> [1]. In most cases, it's written to some file with a memory cache. It's
> possible to query the state [2] but not with SQL. In fact, it's so basic
> that we decided to drop the feature in the future to make room for a more
> sophisticated solution based around replicating the state to an external
> queryable form but there is nothing specific yet.
> 2. It would help if you (re)read the section about state persistence. [3]
> Basically, the state is updated on every write access of the process
> function. Flink creates a checkpoint of the state periodically and can
> recover from these checkpoint. It's also possible to look into these
> checkpoint with the state processor API [4].
> 3. It's embedded. See above to what happens on failure.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#state-backends
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#state-persistence
> [4]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> On Mon, Apr 26, 2021 at 10:43 AM Swagat Mishra  wrote:
>
>> Hi Arvid,
>>
>> On 2 - I was referring to stateful functions as an alternative to
>> windows, but in this particular use case, its not fitting in exactly I
>> think, though a solution can be built around it.
>>
>> On the overall approach here what's the right way to use Flink SQL:
>>
>> Every event has the transaction time which I am using as event time to 
>> assign WatermarkStrategy
>> KeyBy - customerId
>> SlidingEventTimeWindows of 1 hr
>> then process all elements using ProcessWindowFunction
>>
>> Extending above..
>>
>> For the session window, taking the above example , reiterated below:
>>
>> Say Customer1 has done 2 transactions one at 11:00am and other one at 11:20 
>> am.
>> Customer2 has done 1 transaction one at 10:00 am
>> Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.
>>
>> 1 hour window:
>> 9:30AM - 10:30 AM : Customer 2
>> 10:30 AM - 11:30 AM : Customer 1, Customer 3
>> 11:30 AM - 12:30 PM : Customer 3
>>
>> Questions - how do we access the state?
>>
>>1. Will the process window function write to an in-memory SQL table that 
>> does not get flushed to a proper backing database, so all the data stays 
>> in-memory -  if yes can that be queried?
>>2. If the process window function writes to a proper backing database, at 
>> what point should this happen? Because the API can query the state at any 
>> point of time, so the data that was flushed might be state and need 
>> recomputation.
>>3. How do you recommend for rock db to be used as a state backend? Is 
>> that the embedded rocks db or do you recommend an external implementation. 
>> Embedded rocks db state is lost when the container is restarted i guess, so 
>> we will have to have an external mechanism for restart/ crash recovery?
>>
>> Regards,
>> Swagat
>>
>>
>>
>> On Mon, Apr 26, 2021 at 11:29 AM Arvid Heise  wrote:
>>
>>> 1. It always depends on the data volume per user. A million user is not
>>> much if you compare it to the biggest Flink installations (Netflix,
>>> Alibaba, PInterest, ...). However, for a larger scale and scalability, I'd
>>> recommend to use rocksDB state backend. [1]
>>>
>>> 2. Are you referring to statefun? I'd say that for your use case, Flink
>>> is a better fit. Statefun is more suitable when each actor (=user in your
>>> case) acts differently depending on the data like in a state machine. In
>>> your case, your users should be processed in the same way: Even if the
>>> windows are independently opened and closed, every user has only at most
>>> one window open at a given event time. You probably also aggregate all user
>>> states more or less in the same way.
>>>
>>> Or did you refer to processing functions with state? That's certainly
>>> possible to implement but it won't be much faster unless you can exploit
>>> some specific properties of your application. An example is written in [2].
>>> I'd recommend to first use regular, built-in windows and only switch to
>>> custom code if the performance is insufficient. Custom imp

Re: Flink Event specific window

2021-04-26 Thread Swagat Mishra
Hi Arvid,

On 2 - I was referring to stateful functions as an alternative to windows,
but in this particular use case, its not fitting in exactly I think, though
a solution can be built around it.

On the overall approach here what's the right way to use Flink SQL:

Every event has the transaction time which I am using as event time to
assign WatermarkStrategy
KeyBy - customerId
SlidingEventTimeWindows of 1 hr
then process all elements using ProcessWindowFunction

Extending above..

For the session window, taking the above example , reiterated below:

Say Customer1 has done 2 transactions one at 11:00am and other one at 11:20 am.
Customer2 has done 1 transaction one at 10:00 am
Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.

1 hour window:
9:30AM - 10:30 AM : Customer 2
10:30 AM - 11:30 AM : Customer 1, Customer 3
11:30 AM - 12:30 PM : Customer 3

Questions - how do we access the state?

   1. Will the process window function write to an in-memory SQL table
that does not get flushed to a proper backing database, so all the
data stays in-memory -  if yes can that be queried?
   2. If the process window function writes to a proper backing
database, at what point should this happen? Because the API can query
the state at any point of time, so the data that was flushed might be
state and need recomputation.
   3. How do you recommend for rock db to be used as a state backend?
Is that the embedded rocks db or do you recommend an external
implementation. Embedded rocks db state is lost when the container is
restarted i guess, so we will have to have an external mechanism for
restart/ crash recovery?

Regards,
Swagat



On Mon, Apr 26, 2021 at 11:29 AM Arvid Heise  wrote:

> 1. It always depends on the data volume per user. A million user is not
> much if you compare it to the biggest Flink installations (Netflix,
> Alibaba, PInterest, ...). However, for a larger scale and scalability, I'd
> recommend to use rocksDB state backend. [1]
>
> 2. Are you referring to statefun? I'd say that for your use case, Flink is
> a better fit. Statefun is more suitable when each actor (=user in your
> case) acts differently depending on the data like in a state machine. In
> your case, your users should be processed in the same way: Even if the
> windows are independently opened and closed, every user has only at most
> one window open at a given event time. You probably also aggregate all user
> states more or less in the same way.
>
> Or did you refer to processing functions with state? That's certainly
> possible to implement but it won't be much faster unless you can exploit
> some specific properties of your application. An example is written in [2].
> I'd recommend to first use regular, built-in windows and only switch to
> custom code if the performance is insufficient. Custom implementations may
> be faster now, but come with a higher maintenance cost and the built-in
> windows may be better optimized in future.
>
> Lastly if your query is of relational nature, I'd recommend to have a look
> at Table API/SQL [3]. Unless you really invest a lot of time, you won't be
> able to write more efficient code than what Table API is generating.
>
> [1] https://flink.apache.org/2021/01/18/rocksdb.html
> [2] https://flink.apache.org/news/2020/07/30/demo-fraud-detection-3.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#session-session-windows
>
> On Sun, Apr 25, 2021 at 11:46 PM Swagat Mishra  wrote:
>
>>  1. What if there are a very high number of users, like a million
>> customers won't the service crash? Is it advisable to hold the data in
>> memory.
>>
>> 2. What if state-functions are used to calculate the value ? How will
>> this approach differ from the one proposed below.
>>
>> Regards,
>> Swagat
>>
>> On Wed, Apr 21, 2021, 11:25 PM Arvid Heise  wrote:
>>
>>> Hi Sunitha,
>>>
>>> the approach you are describing sounds like you want to use a session
>>> window. [1] If you only want to count them if they happen at the same hour
>>> then, you want to use a tumbling window.
>>>
>>> Your datastream approach looks solid.
>>>
>>> For SQL, there is also a session (and tumbling) window [2]. You can see
>>> examples at the bottom of the section.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#session-windows
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#group-windows
>>>
>>> On Tue, Apr 20, 2021 at 11:03 PM s_penakalap...@yahoo.com <
>>> s_penakalap...@yahoo.com> wrote:
>>>
>>>> Hi All,
&g

Re: Flink Event specific window

2021-04-25 Thread Swagat Mishra
 1. What if there are a very high number of users, like a million customers
won't the service crash? Is it advisable to hold the data in memory.

2. What if state-functions are used to calculate the value ? How will this
approach differ from the one proposed below.

Regards,
Swagat

On Wed, Apr 21, 2021, 11:25 PM Arvid Heise  wrote:

> Hi Sunitha,
>
> the approach you are describing sounds like you want to use a session
> window. [1] If you only want to count them if they happen at the same hour
> then, you want to use a tumbling window.
>
> Your datastream approach looks solid.
>
> For SQL, there is also a session (and tumbling) window [2]. You can see
> examples at the bottom of the section.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#session-windows
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#group-windows
>
> On Tue, Apr 20, 2021 at 11:03 PM s_penakalap...@yahoo.com <
> s_penakalap...@yahoo.com> wrote:
>
>> Hi All,
>>
>> I have one requirement where I need to calculate total amount of
>> transactions done by each each user in last 1 hour.
>> Say Customer1 has done 2 transactions one at 11:00am and other one at
>> 11:20 am.
>> Customer2 has done 1 transaction one at 10:00 am
>> Customer3 has done 3 transactions one at 11:20 am,  11:40 am and 11:45 am.
>>
>> when ever customer does a transaction then we receive an event in source
>> topic, we consume the data and need to calculate the total amount spent by
>> Customer in last 1 hour.
>>
>> if I have received customer1 new transaction event at 11:30 am then I
>> need to calculate the sum of 3 events done by customer1 in last 1 hour (i.e
>> 11:00 , 11:20 and 11:30 am - all 3 events fall in last 1 hour window)
>> Now say I receive Customer2 new transaction event at 11:30 am then for
>> this customer I need to consider only one event 11:30 (ignoring the event
>> at  10:00 am  as it does not fall in last 1 hr)
>> Customer3 new transaction is done at 12:40 pm then for this Customer I
>> need to calculate sum of ( 11:40 am , 11:45 am and 12:40pm) as all 3 fall
>> under last 1 hr.
>>
>> Approach I am planning to try:
>> Every event has the transaction time which I am using as event time to
>> assign WatermarkStrategy
>> KeyBy - customerId
>> SlidingEventTimeWindows of 1 hr
>> then process all elements using ProcessWindowFunction
>>
>>
>> Kindly suggest the approach I need to follow to achieve the above
>> scenario using Flink Java /Sql. I am using Flink 1.12.0.
>>
>> Regards,
>> Sunitha
>>
>


Re: Approaches for external state for Flink

2021-04-24 Thread Swagat Mishra
Why not use upserts? Wouldn't that solve the issue of duplicates and there
won't be a need to query database too?

On Sat, Apr 24, 2021, 8:12 PM David Anderson  wrote:

> What are the other techniques for bootstrapping rocksdb state?
>
>
> Bootstrapping state involves somehow creating a snapshot (typically a
> savepoint, but a retained checkpoint can be a better choice in some cases)
> containing the necessary state -- meaning that the state has the same
> operator uid and and state descriptor used by the real streaming job.
>
> You can do this by either: (1) running a variant of the live streaming job
> against the data used for bootstrapping and taking a snapshot when the data
> has been fully ingested, or (2) by using the State Processor API [1].
> You'll find a trivial example of the second approach in [2]. Once you have
> a suitable snapshot, you can run your real job against it.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/libs/state_processor_api.html
> [2] https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf
>
> Regards,
> David
>
> On Sat, Apr 24, 2021 at 3:01 PM Omngr 
> wrote:
>
>> Hi David, thank you for your response first!
>>
>> The state size is about 1 TB for now, but it will increase fastly, and
>> also I can not use the TLL for states. It will grow indefinitely.
>> What are the other techniques for bootstrapping rocksdb state?
>>
>> David Anderson , 24 Nis 2021 Cmt, 15:43 tarihinde
>> şunu yazdı:
>>
>>> Oguzhan,
>>>
>>> Note, the state size is very large and I have to feed the state from
 batch flow firstly. Thus I can not use the internal state like rocksdb.
>>>
>>>
>>> How large is "very large"? Using RocksDB, several users have reported
>>> working with jobs using many TBs of state.
>>>
>>> And there are techniques for bootstrapping the state. That doesn't have
>>> to be a showstopper.
>>>
>>> May be any bottleneck in that flow? I think to use asyncMap functions
 for state read/write operations.
>>>
>>>
>>> That's a good reason to reconsider using Flink state.
>>>
>>> Regards,
>>> David
>>>
>>>
>>>
>>> On Fri, Apr 23, 2021 at 12:22 PM Oğuzhan Mangır <
>>> sosyalmedya.oguz...@gmail.com> wrote:
>>>
 I'm trying to design a stream flow that checks *de-duplicate* events
 and sends them to the Kafka topic.

 Basically, flow looks like that;

 kafka (multiple topics) =>  flink (checking de-duplication and event
 enrichment) => kafka (single topic)

 For de-duplication, I'm thinking of using Cassandra as an external
 state store. The details of my job;

 I have an event payload with *uuid* Field. If the event that has the
 same uuid will come, this event should be discarded. In my case, two kafka
 topics are reading. The first topic has a lot of fields, but other topics
 just have a *uuid* field, thus I have to enrich data using the same
 uuid for the events coming from the second topic.

 Stream1: Messages reading from the first topic. Read state from
 Cassandra using the *uuid*. If a state exists, ignore this event and *do
 not* emit to the Kafka. If state does not exist, save  this event to
 the Cassandra, then emit this event to the Kafka.

 Stream2: Messages reading from the second topic. Read state from
 Cassandra using the *uuid*. If state exists, check a column that
 represents this event came from topic2. If the value of this column is
 false, enrich the event using state and update the Cassandra column as
 true. If true, ignore this event because this event is a duplicate.

 def checkDeDuplication(event): Option[Event] = {
   val state = readFromCassandra(state)
   if (state exist) None //ignore this event
   else {
 saveEventToCassandra(event)
 Some(event)
   }
 }

 def checkDeDuplicationAndEnrichEvent(event): Option[Event] = {
   val state = readFromCassandra(state)
   if (state does not exist) None //ignore this event
   else {
 if (state.flag == true) None // ignore this event
 else {
updateFlagAsTrueInCassandra(event)
Some(event)
 }
   }
 }


 val stream1 = 
 readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate())
 val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent())
 stream1.union(stream2).addSink(kafkaSink)

 1- Is that a good approach?

 2- Is Cassandra the right choice here? Note, the state size is very
 large and I have to feed the state from batch flow firstly. Thus I can not
 use the internal state like rocksdb.

 3- Can i improve this logic?

 4- May be any bottleneck in that flow? I think to use asyncMap
 functions for state read/write operations.

>>>


Re: Flink - Pod Identity

2021-04-06 Thread Swagat Mishra
I was able to solve the issue by providing a custom version of the presto
jar. I will create a ticket and raise a pull request so that others can
benefit from it. I will share the details here shortly.

Thanks everyone for your help and support. Especially Austin, he stands out
due to his interest in the issue and helping to find ways to resolve it.

Regards,
Swagat

On Tue, Apr 6, 2021 at 2:35 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> And actually, I've found that the correct version of the AWS SDK *is*
> included in Flink 1.12, which was reported and fixed in FLINK-18676
> (see[1]). Since you said you saw this also occur in 1.12, can you share
> more details about what you saw there?
>
> Best,
> Austin
>
> [1]: https://issues.apache.org/jira/browse/FLINK-18676
>
> On Mon, Apr 5, 2021 at 4:53 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> That looks interesting! I've also found the full list of S3 properties[1]
>> for the version of presto-hive bundled with Flink 1.12 (see [2]), which
>> includes an option for a KMS key (hive.s3.kms-key-id).
>>
>> (also, adding back the user list)
>>
>> [1]:
>> https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/filesystems/s3.html#hadooppresto-s3-file-systems-plugins
>>
>> On Mon, Apr 5, 2021 at 4:21 PM Swagat Mishra  wrote:
>>
>>> Btw, there is also an option to provide a custom credential provider,
>>> what are your thoughts on this?
>>>
>>> presto.s3.credentials-provider
>>>
>>>
>>> On Tue, Apr 6, 2021 at 12:43 AM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
>>>> I've confirmed that for the bundled + shaded aws dependency, the only
>>>> way to upgrade it is to build a flink-s3-fs-presto jar with the updated
>>>> dependency. Let me know if this is feasible for you, if the KMS key
>>>> solution doesn't work.
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>> On Mon, Apr 5, 2021 at 2:18 PM Austin Cawley-Edwards <
>>>> austin.caw...@gmail.com> wrote:
>>>>
>>>>> Hi Swagat,
>>>>>
>>>>> I don't believe there is an explicit configuration option for the KMS
>>>>> key – please let me know if you're able to make that work!
>>>>>
>>>>> Best,
>>>>> Austin
>>>>>
>>>>> On Mon, Apr 5, 2021 at 1:45 PM Swagat Mishra 
>>>>> wrote:
>>>>>
>>>>>> Hi Austin,
>>>>>>
>>>>>> Let me know what you think on my latest email, if the approach might
>>>>>> work, or if it is already supported and I am not using the configurations
>>>>>> properly.
>>>>>>
>>>>>> Thanks for your interest and support.
>>>>>>
>>>>>> Regards,
>>>>>> Swagat
>>>>>>
>>>>>> On Mon, Apr 5, 2021 at 10:39 PM Austin Cawley-Edwards <
>>>>>> austin.caw...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Swagat,
>>>>>>>
>>>>>>> It looks like Flink 1.6 bundles the 1.11.165 version of the
>>>>>>> aws-java-sdk-core with the Presto implementation (transitively from 
>>>>>>> Presto
>>>>>>> 0.185[1]).
>>>>>>> The minimum support version for the ServiceAccount authentication
>>>>>>> approach is 1.11.704 (see [2]) which was released on Jan 9th, 2020[3], 
>>>>>>> long
>>>>>>> after Flink 1.6 was released. It looks like even the most recent Presto 
>>>>>>> is
>>>>>>> on a version below that, concretely 1.11.697 in the master branch[4], 
>>>>>>> so I
>>>>>>> don't think even upgrading Flink to 1.6+ will solve this though it 
>>>>>>> looks to
>>>>>>> me like the AWS dependency is managed better in more recent Flink 
>>>>>>> versions.
>>>>>>> I'll have more for you on that front tomorrow, after the Easter break.
>>>>>>>
>>>>>>> I think what you would have to do to make this authentication
>>>>>>> approach work for Flink 1.6 is building a custom version of the
>>>>>>> flink-s3-fs-presto jar, r

Re: Flink - Pod Identity

2021-04-05 Thread Swagat Mishra
Hi Austin,

Thanks for your reply.

Atm, I have upgraded to 1.12 version of Flink, but I still see the same
issue. I have taken a look at presto as well. I am looking to
experiment with the settings like S3_KMS_KEY_ID (provided in the link
below). If this doesn't work, I Will look to modify the Presto code to have
a custom version that supports pod identity through a service account.

Yes, I Can create a JIRA ticket for you.

https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3ConfigurationUpdater.java

Regards,
Swagat

On Mon, Apr 5, 2021 at 10:39 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi Swagat,
>
> It looks like Flink 1.6 bundles the 1.11.165 version of the
> aws-java-sdk-core with the Presto implementation (transitively from Presto
> 0.185[1]).
> The minimum support version for the ServiceAccount authentication approach
> is 1.11.704 (see [2]) which was released on Jan 9th, 2020[3], long after
> Flink 1.6 was released. It looks like even the most recent Presto is on a
> version below that, concretely 1.11.697 in the master branch[4], so I don't
> think even upgrading Flink to 1.6+ will solve this though it looks to me
> like the AWS dependency is managed better in more recent Flink versions.
> I'll have more for you on that front tomorrow, after the Easter break.
>
> I think what you would have to do to make this authentication approach
> work for Flink 1.6 is building a custom version of the flink-s3-fs-presto
> jar, replacing the bundled AWS dependency with the 1.11.704 version, and
> then shading it the same way.
>
> In the meantime, would you mind creating a JIRA ticket with this use case?
> That'll give you the best insight into the status of fixing this :)
>
> Let me know if that makes sense,
> Austin
>
> [1]:
> https://github.com/prestodb/presto/blob/1d4ee196df4327568c0982811d8459a44f1792b9/pom.xml#L53
> [2]:
> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts-minimum-sdk.html
> [3]: https://github.com/aws/aws-sdk-java/releases/tag/1.11.704
> [4]: https://github.com/prestodb/presto/blob/master/pom.xml#L52
>
> On Sun, Apr 4, 2021 at 3:32 AM Swagat Mishra  wrote:
>
>> Austin -
>>
>> In my case the set up is such that services are deployed on Kubernetes
>> with Docker, running on EKS. There is also an istio service mesh. So all
>> the services communicate and access AWS resources like S3 using the service
>> account. Service account is associated with IAM roles. I have verified that
>> the service account has access to S3, by running a program that connects to
>> S3 to read a file also aws client when packaged into the pod is able to
>> access S3. So that means the roles and policies are good.
>>
>> When I am running flink, I am following the same configuration for job
>> manager and task manager as provided here:
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html
>>
>> The exception we are getting is -
>> org.apache.flink.fs.s3presto.shaded.com.amazonaws.SDKClientException:
>> Unable to load credentials from service end point.
>>
>> This happens in the EC2CredentialFetcher class method fetchCredentials -
>> line number 66, when it tries to read resource, effectively executing
>> CURL 169.254.170.2/AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
>>
>> I am not setting the variable AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
>> because its not the right way to do it for us, we are on EKS. Similarly any
>> of the ~/.aws/credentials file approach will also not work for us.
>>
>>
>> Atm, I haven't tried the kuberenetes service account property you
>> mentioned above. I will try and let you know how it goes.
>>
>> Question - do i need to provide any parameters while building the docker
>> image or any configuration in the flink config to tell flink that for all
>> purposes it should be using the service account and not try to get into
>> the EC2CredentialFetcher class.
>>
>> One more thing - we were trying this on the 1.6 version of Flink and not
>> the 1.12 version.
>>
>> Regards,
>> Swagat
>>
>> On Sun, Apr 4, 2021 at 8:56 AM Sameer Wadkar  wrote:
>>
>>> Kube2Iam needs to modify IPtables to proxy calls to ec2 metadata to a
>>> daemonset which runs privileged pods which maps a IP Address of the pods
>>> and its associated service account to make STS calls and return temporary
>>> AWS credentials. Your pod “thinks” the ec2 metadata url works locally like
>>> in an ec2 instance.
>>>
>>> I have found that muta

Re: Flink - Pod Identity

2021-04-04 Thread Swagat Mishra
Austin -

In my case the set up is such that services are deployed on Kubernetes with
Docker, running on EKS. There is also an istio service mesh. So all the
services communicate and access AWS resources like S3 using the service
account. Service account is associated with IAM roles. I have verified that
the service account has access to S3, by running a program that connects to
S3 to read a file also aws client when packaged into the pod is able to
access S3. So that means the roles and policies are good.

When I am running flink, I am following the same configuration for job
manager and task manager as provided here:

https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html

The exception we are getting is -
org.apache.flink.fs.s3presto.shaded.com.amazonaws.SDKClientException:
Unable to load credentials from service end point.

This happens in the EC2CredentialFetcher class method fetchCredentials -
line number 66, when it tries to read resource, effectively executing
CURL 169.254.170.2/AWS_CONTAINER_CREDENTIALS_RELATIVE_URI

I am not setting the variable AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
because its not the right way to do it for us, we are on EKS. Similarly any
of the ~/.aws/credentials file approach will also not work for us.


Atm, I haven't tried the kuberenetes service account property you mentioned
above. I will try and let you know how it goes.

Question - do i need to provide any parameters while building the docker
image or any configuration in the flink config to tell flink that for all
purposes it should be using the service account and not try to get into
the EC2CredentialFetcher class.

One more thing - we were trying this on the 1.6 version of Flink and not
the 1.12 version.

Regards,
Swagat

On Sun, Apr 4, 2021 at 8:56 AM Sameer Wadkar  wrote:

> Kube2Iam needs to modify IPtables to proxy calls to ec2 metadata to a
> daemonset which runs privileged pods which maps a IP Address of the pods
> and its associated service account to make STS calls and return temporary
> AWS credentials. Your pod “thinks” the ec2 metadata url works locally like
> in an ec2 instance.
>
> I have found that mutating webhooks are easier to deploy (when you have no
> control over the Kubernetes environment - say you cannot change iptables or
> run privileged pods). These can configure the ~/.aws/credentials file. The
> webhook can make the STS call for the service account to role mapping. A
> side car container to which the main container has no access can even renew
> credentials becoz STS returns temp credentials.
>
> Sent from my iPhone
>
> On Apr 3, 2021, at 10:29 PM, Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
> 
> If you’re just looking to attach a service account to a pod using the
> native AWS EKS IAM mapping[1], you should be able to attach the service
> account to the pod via the `kubernetes.service-account` configuration
> option[2].
>
> Let me know if that works for you!
>
> Best,
> Austin
>
> [1]:
> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#kubernetes-service-account
>
> On Sat, Apr 3, 2021 at 10:18 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Can you describe your setup a little bit more? And perhaps how you use
>> this setup to grant access to other non-Flink pods?
>>
>> On Sat, Apr 3, 2021 at 2:29 PM Swagat Mishra  wrote:
>>
>>> Yes I looked at kube2iam, I haven't experimented with it.
>>>
>>> Given that the service account has access to S3, shouldn't we have a
>>> simpler mechanism to connect to underlying resources based on the service
>>> account authorization?
>>>
>>> On Sat, Apr 3, 2021, 10:10 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
>>>> Hi Swagat,
>>>>
>>>> I’ve used kube2iam[1] for granting AWS access to Flink pods in the past
>>>> with good results. It’s all based on mapping pod annotations to AWS IAM
>>>> roles. Is this something that might work for you?
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>> [1]: https://github.com/jtblin/kube2iam
>>>>
>>>> On Sat, Apr 3, 2021 at 10:40 AM Swagat Mishra 
>>>> wrote:
>>>>
>>>>> No we are running on aws. The mechanisms supported by flink to connect
>>>>> to resources like S3, need us to make changes that will impact all
>>>>> services, something that we don't want to do. So providing the aws secret
>>>>> key ID and passcode upfront or i

Re: Flink - Pod Identity

2021-04-03 Thread Swagat Mishra
Yes I looked at kube2iam, I haven't experimented with it.

Given that the service account has access to S3, shouldn't we have a
simpler mechanism to connect to underlying resources based on the service
account authorization?

On Sat, Apr 3, 2021, 10:10 PM Austin Cawley-Edwards 
wrote:

> Hi Swagat,
>
> I’ve used kube2iam[1] for granting AWS access to Flink pods in the past
> with good results. It’s all based on mapping pod annotations to AWS IAM
> roles. Is this something that might work for you?
>
> Best,
> Austin
>
> [1]: https://github.com/jtblin/kube2iam
>
> On Sat, Apr 3, 2021 at 10:40 AM Swagat Mishra  wrote:
>
>> No we are running on aws. The mechanisms supported by flink to connect to
>> resources like S3, need us to make changes that will impact all services,
>> something that we don't want to do. So providing the aws secret key ID and
>> passcode upfront or iam rules where it connects by executing curl/ http
>> calls to connect to S3 , don't work for me.
>>
>> I want to be able to connect to S3, using aws Api's and if that
>> connection can be leveraged by the presto library, that is what I am
>> looking for.
>>
>> Regards,
>> Swagat
>>
>>
>> On Sat, Apr 3, 2021, 7:37 PM Israel Ekpo  wrote:
>>
>>> Are you running on Azure Kubernetes Service.
>>>
>>> You should be able to do it because the identity can be mapped to the
>>> labels of the pods not necessary Flink.
>>>
>>> On Sat, Apr 3, 2021 at 6:31 AM Swagat Mishra  wrote:
>>>
>>>> Hi,
>>>>
>>>> I think flink doesn't support pod identity, any plans tk achieve it in
>>>> any subsequent release.
>>>>
>>>> Regards,
>>>> Swagat
>>>>
>>>>
>>>>


Re: Flink - Pod Identity

2021-04-03 Thread Swagat Mishra
No we are running on aws. The mechanisms supported by flink to connect to
resources like S3, need us to make changes that will impact all services,
something that we don't want to do. So providing the aws secret key ID and
passcode upfront or iam rules where it connects by executing curl/ http
calls to connect to S3 , don't work for me.

I want to be able to connect to S3, using aws Api's and if that connection
can be leveraged by the presto library, that is what I am looking for.

Regards,
Swagat

On Sat, Apr 3, 2021, 7:37 PM Israel Ekpo  wrote:

> Are you running on Azure Kubernetes Service.
>
> You should be able to do it because the identity can be mapped to the
> labels of the pods not necessary Flink.
>
> On Sat, Apr 3, 2021 at 6:31 AM Swagat Mishra  wrote:
>
>> Hi,
>>
>> I think flink doesn't support pod identity, any plans tk achieve it in
>> any subsequent release.
>>
>> Regards,
>> Swagat
>>
>>
>>


Flink - Pod Identity

2021-04-03 Thread Swagat Mishra
Hi,

I think flink doesn't support pod identity, any plans tk achieve it in any
subsequent release.

Regards,
Swagat