thank you

i wil have a look at datasteeam.global

is there any other way to maintain state like by using valuestate.


On Thu, 6 May 2021 at 1:26 PM, Arvid Heise <ar...@apache.org> wrote:

> If you keyby then all direct functions see only the elements with the same
> key. So that's the expected behavior and the base of Flink's parallel
> processing capabilities.
>
> If you want to generate a window over all customers, you have to use a
> global window. However, that also means that no parallelization can happen,
> so I'd discourage that.
>
> A better way would be to perform as many calculations as possible in the
> process function (for example create a customer with buy information
> record) and then have a DataStream#global() reshuffle to collect all
> aggregated information on one node.
>
> On Thu, May 6, 2021 at 9:20 AM Swagat Mishra <swaga...@gmail.com> wrote:
>
>> Thank you.
>>
>> sourceContext.collectWithTimestamp(c, c.getEventTime());
>>
>> Adding this to the source context worked.
>>
>> However I am still getting only one customer in the process method. i would 
>> expect the iterable to provide all customers in the window. or do i have to 
>> maintain state.
>>
>>
>> changes for reference:
>>
>> I made the following change, also removed anly lag that i had introduced for 
>> experimentation earlier.
>>
>> so trigger looks like:
>>
>>
>>     @Override
>>     public TriggerResult onElement(Customer customer, long l, TimeWindow 
>> timeWindow, TriggerContext triggerContext) throws Exception {
>>         if (timeWindow.maxTimestamp() <= 
>> triggerContext.getCurrentWatermark()) {
>>             // if the watermark is already past the window fire immediately
>>             return TriggerResult.FIRE;
>>         } else {
>>             //LOGGER.info("Max timestamp for customer: " + 
>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
>>             triggerContext.registerEventTimeTimer(customer.getEventTime());  
>>                   return TriggerResult.FIRE;
>>
>>         }
>>     }
>>
>>     @Override
>>     public TriggerResult onEventTime(long time, TimeWindow timeWindow, 
>> TriggerContext triggerContext) {
>>         return time == timeWindow.maxTimestamp() ?
>>                 TriggerResult.FIRE :
>>                 TriggerResult.CONTINUE;
>>     }
>>
>>     @Override
>>     public TriggerResult onProcessingTime(long time, TimeWindow window, 
>> TriggerContext ctx) throws Exception {
>>         return TriggerResult.CONTINUE;
>>     }
>>
>>     @Override
>>     public void clear(TimeWindow window, TriggerContext ctx) throws 
>> Exception {
>>         ctx.deleteEventTimeTimer(window.maxTimestamp());
>>     }
>>
>>     @Override
>>     public boolean canMerge() {
>>         return true;
>>     }
>>
>> and *removed latenness*
>>
>> customerStream
>>
>>         
>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
>>         .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
>>         .keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
>>         .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
>>         //.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
>>         .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
>>         .process(new CustomAggregateFunction());
>>
>>
>> On Thu, May 6, 2021 at 12:32 PM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Your source is not setting the timestamp with collectWithTimestamp. I'm
>>> assuming that nothing really moves from the event time's perspective.
>>>
>>> On Thu, May 6, 2021 at 8:58 AM Swagat Mishra <swaga...@gmail.com> wrote:
>>>
>>>> Yes customer generator is setting the event timestamp correctly like I
>>>> see below. I debugged and found that the events are getting late, so never
>>>> executed. i.e,. in the window operator the method  this.isWindowLate(
>>>> actualWindow) is getting executed to false for the rest of the events
>>>> except the first, hence the events are getting skipped, not able to figure
>>>> out where exactly the issue is.
>>>>
>>>> i have removed evictot=r because I don't think I need it yet.
>>>>
>>>> stream looks like
>>>>
>>>> customerStream
>>>>         .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
>>>>         .keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
>>>>         
>>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
>>>>         .trigger(new EventTimeTrigger())
>>>>         .process(new CustomAggregateFunction());
>>>>
>>>>
>>>> *Customer generator looks like:*
>>>>
>>>> while (isRunning) {
>>>>     Customer c = new Customer(CUSTOMER_KEY[counter % 5],* 
>>>> LocalTime.now()*, 1000); // that's the event time
>>>>     System.out.println("Writing customer: " + c);
>>>>     sourceContext.collect(c);
>>>>     //sourceContext.emitWatermark(new Watermark(c.getEventTime()));
>>>>     Thread.sleep(1000);
>>>>     counter++;
>>>>     if(counter % 11 == 0) {
>>>>         System.out.println("Sleeping for 10 seconds");
>>>>         Thread.sleep(10000);
>>>>     }
>>>> }
>>>>
>>>>
>>>> Custom Watermark generator has this:
>>>>
>>>> .....
>>>> @Override
>>>> public void onEvent(Customer customer, long l, WatermarkOutput 
>>>> watermarkOutput) {
>>>>     currentMaxTimestamp = Math.max(currentMaxTimestamp, 
>>>> customer.getEventTime()  );
>>>> }
>>>>
>>>> @Override
>>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>>>     watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>>>>
>>>> }
>>>> .....
>>>>
>>>> trigger looks like:
>>>>
>>>> ------
>>>>
>>>>
>>>>  @Override
>>>>     public TriggerResult onElement(Customer customer, long l, TimeWindow 
>>>> timeWindow, TriggerContext triggerContext) throws Exception {
>>>>         if (timeWindow.maxTimestamp() <= 
>>>> triggerContext.getCurrentWatermark()) {
>>>>             // if the watermark is already past the window fire immediately
>>>>             return TriggerResult.FIRE;
>>>>         } else {
>>>>             LOGGER.info("Max timestamp for customer: " + 
>>>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
>>>>             
>>>> triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay());
>>>>             return TriggerResult.FIRE;
>>>>         }
>>>>     }
>>>>
>>>>     @Override
>>>>     public TriggerResult onEventTime(long time, TimeWindow timeWindow, 
>>>> TriggerContext triggerContext) {
>>>> //        if (timeWindow.maxTimestamp() > 
>>>> triggerContext.getCurrentWatermark()) {
>>>> //            
>>>> triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
>>>> //            return TriggerResult.CONTINUE;
>>>> //        }
>>>>
>>>>         return time == timeWindow.maxTimestamp() ?
>>>>                 TriggerResult.FIRE :
>>>>                 TriggerResult.CONTINUE;
>>>>     }
>>>>
>>>>
>>>> ....
>>>>
>>>>
>>>> On Thu, May 6, 2021 at 12:02 PM Arvid Heise <ar...@apache.org> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Is your CustomerGenerator setting the event timestamp correctly? Are
>>>>> your evictors evicting too early?
>>>>>
>>>>> You can try to add some debug output into the watermark assigner and
>>>>> see if it's indeed progressing as expected.
>>>>>
>>>>> On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <swaga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> This seems to be working fine in processing time but doesn't work in
>>>>>> event time. Is there an issue with the way the water mark is defined or 
>>>>>> do
>>>>>> we need to set up timers?
>>>>>>
>>>>>> Please advise.
>>>>>>
>>>>>>
>>>>>> WORKING:
>>>>>>
>>>>>> customerStream
>>>>>>         .keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
>>>>>>         .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
>>>>>>         .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
>>>>>>         .process(new CustomAggregateFunction());
>>>>>>
>>>>>>
>>>>>> NOT WORKING:
>>>>>>
>>>>>> customerStream
>>>>>>         
>>>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>>>>>> WaterMarkAssigner()))
>>>>>>         .keyBy(Customer::getIdentifier)
>>>>>>         .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>>>>>>         .trigger(EventTimeTrigger.create())
>>>>>>         .evictor(new CustomerEvictor())
>>>>>>         .process(new CustomAggregateFunction())
>>>>>>         .print();
>>>>>>
>>>>>>
>>>>>> On Thu, May 6, 2021 at 1:53 AM Sam <swagat....@gmail.com> wrote:
>>>>>>
>>>>>>> Adding the code for CustomWatermarkGenerator
>>>>>>>
>>>>>>> .....
>>>>>>> @Override
>>>>>>> public void onEvent(Customer customer, long l, WatermarkOutput 
>>>>>>> watermarkOutput) {
>>>>>>>     currentMaxTimestamp = Math.max(currentMaxTimestamp, 
>>>>>>> customer.getEventTime()  );
>>>>>>> }
>>>>>>>
>>>>>>> @Override
>>>>>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>>>>>>     watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>>>>>>>
>>>>>>> }
>>>>>>> .....
>>>>>>>
>>>>>>>
>>>>>>> On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <swaga...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Bit of background, I have a stream of customers who have purchased
>>>>>>>> some product, reading these transactions on a KAFKA topic. I want to
>>>>>>>> aggregate the number of products the customer has purchased in a 
>>>>>>>> particular
>>>>>>>> duration  ( say 10 seconds ) and write to a sink.
>>>>>>>>
>>>>>>>> I am using session windows to achieve the above.
>>>>>>>>
>>>>>>>> For test purposes, i have mocked  up a customer stream and executed
>>>>>>>> session windows like below.
>>>>>>>>
>>>>>>>> StreamExecutionEnvironment environment = 
>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>> DataStream<Customer> customerStream = environment.addSource( new 
>>>>>>>> CustomerGenerator() );
>>>>>>>>
>>>>>>>> customerStream
>>>>>>>>         
>>>>>>>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
>>>>>>>>         
>>>>>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>>>>>>>> WaterMarkAssigner()))
>>>>>>>>         .keyBy(Customer::getIdentifier)
>>>>>>>>         .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>>>>>>>>         .trigger(EventTimeTrigger.create())
>>>>>>>>         .evictor(new CustomerEvictor())
>>>>>>>>         .process(new CustomAggregateFunction())
>>>>>>>>         .print();
>>>>>>>>
>>>>>>>> My watermark assigner looks like:
>>>>>>>>
>>>>>>>> public class WaterMarkAssigner implements WatermarkStrategy<Customer> {
>>>>>>>>     static final Logger logger = 
>>>>>>>> LoggerFactory.getLogger(WaterMarkAssigner.class);
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public WatermarkGenerator<Customer> 
>>>>>>>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>>>>>>>>         return new CustomWatermarkGenerator();
>>>>>>>>     }
>>>>>>>> }
>>>>>>>>
>>>>>>>> I notice that the evictor, and aggregation functions are getting 
>>>>>>>> called only once for the first customer in the stream.
>>>>>>>>
>>>>>>>> The data stream is generating customers at 1 seconds interval and 
>>>>>>>> there are 5 customer keys for which it's generating transactions.
>>>>>>>>
>>>>>>>> Am I doing something wrong with the above?
>>>>>>>>
>>>>>>>> I want to be able to capture the event on each transaction getting 
>>>>>>>> added and removed from the window so that I can perform the 
>>>>>>>> aggregation.
>>>>>>>>
>>>>>>>> please advise.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>

Reply via email to