thank you

i wil have a look at

is there any other way to maintain state like by using valuestate.

On Thu, 6 May 2021 at 1:26 PM, Arvid Heise <> wrote:

> If you keyby then all direct functions see only the elements with the same
> key. So that's the expected behavior and the base of Flink's parallel
> processing capabilities.
> If you want to generate a window over all customers, you have to use a
> global window. However, that also means that no parallelization can happen,
> so I'd discourage that.
> A better way would be to perform as many calculations as possible in the
> process function (for example create a customer with buy information
> record) and then have a DataStream#global() reshuffle to collect all
> aggregated information on one node.
> On Thu, May 6, 2021 at 9:20 AM Swagat Mishra <> wrote:
>> Thank you.
>> sourceContext.collectWithTimestamp(c, c.getEventTime());
>> Adding this to the source context worked.
>> However I am still getting only one customer in the process method. i would 
>> expect the iterable to provide all customers in the window. or do i have to 
>> maintain state.
>> changes for reference:
>> I made the following change, also removed anly lag that i had introduced for 
>> experimentation earlier.
>> so trigger looks like:
>>     @Override
>>     public TriggerResult onElement(Customer customer, long l, TimeWindow 
>> timeWindow, TriggerContext triggerContext) throws Exception {
>>         if (timeWindow.maxTimestamp() <= 
>> triggerContext.getCurrentWatermark()) {
>>             // if the watermark is already past the window fire immediately
>>             return TriggerResult.FIRE;
>>         } else {
>>             //"Max timestamp for customer: " + 
>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
>>             triggerContext.registerEventTimeTimer(customer.getEventTime());  
>>                   return TriggerResult.FIRE;
>>         }
>>     }
>>     @Override
>>     public TriggerResult onEventTime(long time, TimeWindow timeWindow, 
>> TriggerContext triggerContext) {
>>         return time == timeWindow.maxTimestamp() ?
>>                 TriggerResult.FIRE :
>>                 TriggerResult.CONTINUE;
>>     }
>>     @Override
>>     public TriggerResult onProcessingTime(long time, TimeWindow window, 
>> TriggerContext ctx) throws Exception {
>>         return TriggerResult.CONTINUE;
>>     }
>>     @Override
>>     public void clear(TimeWindow window, TriggerContext ctx) throws 
>> Exception {
>>         ctx.deleteEventTimeTimer(window.maxTimestamp());
>>     }
>>     @Override
>>     public boolean canMerge() {
>>         return true;
>>     }
>> and *removed latenness*
>> customerStream
>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
>>         .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
>>         .keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
>>         .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
>>         //.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
>>         .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
>>         .process(new CustomAggregateFunction());
>> On Thu, May 6, 2021 at 12:32 PM Arvid Heise <> wrote:
>>> Your source is not setting the timestamp with collectWithTimestamp. I'm
>>> assuming that nothing really moves from the event time's perspective.
>>> On Thu, May 6, 2021 at 8:58 AM Swagat Mishra <> wrote:
>>>> Yes customer generator is setting the event timestamp correctly like I
>>>> see below. I debugged and found that the events are getting late, so never
>>>> executed. i.e,. in the window operator the method  this.isWindowLate(
>>>> actualWindow) is getting executed to false for the rest of the events
>>>> except the first, hence the events are getting skipped, not able to figure
>>>> out where exactly the issue is.
>>>> i have removed evictot=r because I don't think I need it yet.
>>>> stream looks like
>>>> customerStream
>>>>         .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
>>>>         .keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
>>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
>>>>         .trigger(new EventTimeTrigger())
>>>>         .process(new CustomAggregateFunction());
>>>> *Customer generator looks like:*
>>>> while (isRunning) {
>>>>     Customer c = new Customer(CUSTOMER_KEY[counter % 5],* 
>>>>*, 1000); // that's the event time
>>>>     System.out.println("Writing customer: " + c);
>>>>     sourceContext.collect(c);
>>>>     //sourceContext.emitWatermark(new Watermark(c.getEventTime()));
>>>>     Thread.sleep(1000);
>>>>     counter++;
>>>>     if(counter % 11 == 0) {
>>>>         System.out.println("Sleeping for 10 seconds");
>>>>         Thread.sleep(10000);
>>>>     }
>>>> }
>>>> Custom Watermark generator has this:
>>>> .....
>>>> @Override
>>>> public void onEvent(Customer customer, long l, WatermarkOutput 
>>>> watermarkOutput) {
>>>>     currentMaxTimestamp = Math.max(currentMaxTimestamp, 
>>>> customer.getEventTime()  );
>>>> }
>>>> @Override
>>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>>>     watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>>>> }
>>>> .....
>>>> trigger looks like:
>>>> ------
>>>>  @Override
>>>>     public TriggerResult onElement(Customer customer, long l, TimeWindow 
>>>> timeWindow, TriggerContext triggerContext) throws Exception {
>>>>         if (timeWindow.maxTimestamp() <= 
>>>> triggerContext.getCurrentWatermark()) {
>>>>             // if the watermark is already past the window fire immediately
>>>>             return TriggerResult.FIRE;
>>>>         } else {
>>>>   "Max timestamp for customer: " + 
>>>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
>>>> triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay());
>>>>             return TriggerResult.FIRE;
>>>>         }
>>>>     }
>>>>     @Override
>>>>     public TriggerResult onEventTime(long time, TimeWindow timeWindow, 
>>>> TriggerContext triggerContext) {
>>>> //        if (timeWindow.maxTimestamp() > 
>>>> triggerContext.getCurrentWatermark()) {
>>>> //            
>>>> triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
>>>> //            return TriggerResult.CONTINUE;
>>>> //        }
>>>>         return time == timeWindow.maxTimestamp() ?
>>>>                 TriggerResult.FIRE :
>>>>                 TriggerResult.CONTINUE;
>>>>     }
>>>> ....
>>>> On Thu, May 6, 2021 at 12:02 PM Arvid Heise <> wrote:
>>>>> Hi,
>>>>> Is your CustomerGenerator setting the event timestamp correctly? Are
>>>>> your evictors evicting too early?
>>>>> You can try to add some debug output into the watermark assigner and
>>>>> see if it's indeed progressing as expected.
>>>>> On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <>
>>>>> wrote:
>>>>>> This seems to be working fine in processing time but doesn't work in
>>>>>> event time. Is there an issue with the way the water mark is defined or 
>>>>>> do
>>>>>> we need to set up timers?
>>>>>> Please advise.
>>>>>> WORKING:
>>>>>> customerStream
>>>>>>         .keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
>>>>>>         .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
>>>>>>         .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
>>>>>>         .process(new CustomAggregateFunction());
>>>>>> customerStream
>>>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>>>>>> WaterMarkAssigner()))
>>>>>>         .keyBy(Customer::getIdentifier)
>>>>>>         .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>>>>>>         .trigger(EventTimeTrigger.create())
>>>>>>         .evictor(new CustomerEvictor())
>>>>>>         .process(new CustomAggregateFunction())
>>>>>>         .print();
>>>>>> On Thu, May 6, 2021 at 1:53 AM Sam <> wrote:
>>>>>>> Adding the code for CustomWatermarkGenerator
>>>>>>> .....
>>>>>>> @Override
>>>>>>> public void onEvent(Customer customer, long l, WatermarkOutput 
>>>>>>> watermarkOutput) {
>>>>>>>     currentMaxTimestamp = Math.max(currentMaxTimestamp, 
>>>>>>> customer.getEventTime()  );
>>>>>>> }
>>>>>>> @Override
>>>>>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>>>>>>     watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>>>>>>> }
>>>>>>> .....
>>>>>>> On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <>
>>>>>>> wrote:
>>>>>>>> Hi,
>>>>>>>> Bit of background, I have a stream of customers who have purchased
>>>>>>>> some product, reading these transactions on a KAFKA topic. I want to
>>>>>>>> aggregate the number of products the customer has purchased in a 
>>>>>>>> particular
>>>>>>>> duration  ( say 10 seconds ) and write to a sink.
>>>>>>>> I am using session windows to achieve the above.
>>>>>>>> For test purposes, i have mocked  up a customer stream and executed
>>>>>>>> session windows like below.
>>>>>>>> StreamExecutionEnvironment environment = 
>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>> DataStream<Customer> customerStream = environment.addSource( new 
>>>>>>>> CustomerGenerator() );
>>>>>>>> customerStream
>>>>>>>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
>>>>>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>>>>>>>> WaterMarkAssigner()))
>>>>>>>>         .keyBy(Customer::getIdentifier)
>>>>>>>>         .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>>>>>>>>         .trigger(EventTimeTrigger.create())
>>>>>>>>         .evictor(new CustomerEvictor())
>>>>>>>>         .process(new CustomAggregateFunction())
>>>>>>>>         .print();
>>>>>>>> My watermark assigner looks like:
>>>>>>>> public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
>>>>>>>>     static final Logger logger = 
>>>>>>>> LoggerFactory.getLogger(WaterMarkAssigner.class);
>>>>>>>>     @Override
>>>>>>>>     public WatermarkGenerator<Customer> 
>>>>>>>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>>>>>>>>         return new CustomWatermarkGenerator();
>>>>>>>>     }
>>>>>>>> }
>>>>>>>> I notice that the evictor, and aggregation functions are getting 
>>>>>>>> called only once for the first customer in the stream.
>>>>>>>> The data stream is generating customers at 1 seconds interval and 
>>>>>>>> there are 5 customer keys for which it's generating transactions.
>>>>>>>> Am I doing something wrong with the above?
>>>>>>>> I want to be able to capture the event on each transaction getting 
>>>>>>>> added and removed from the window so that I can perform the 
>>>>>>>> aggregation.
>>>>>>>> please advise.

Reply via email to