I achieved some enhancement based on [1]. My code is here [2]. Basically I am using "ctx.timerService().registerProcessingTimeTimer(timeoutTime);" inside the processElement method to trigger the onTimer method. And when the onTimer method is triggered I clean the state using "hllStateTwitter.clear();". However, I still have a question. I set the time out to 5000 miliseconds and the onTimer method is triggered slightly different. Why is it happening?
process: 1560850703025 - 1560850708025 onTimer: 1560850708025 - 1560850713017 = 4992 3> estimate cardinality: 544 process: 1560850709019 - 1560850714019 onTimer: 1560850714019 - 1560850718942 = 4923 3> estimate cardinality: 485 process: 1560850714027 - 1560850719027 onTimer: 1560850719027 - 1560850723936 = 4909 3> estimate cardinality: 438 process: 1560850719035 - 1560850724035 [1] https://stackoverflow.com/a/53646529/2096986 [2] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>* On Tue, Jun 18, 2019 at 11:15 AM Felipe Gutierrez < felipe.o.gutier...@gmail.com> wrote: > I am sorry, I wanted to point this reference > https://stackoverflow.com/a/47071833/2096986 which implements a window on > a ProcessFunction in Flink. > *--* > *-- Felipe Gutierrez* > > *-- skype: felipe.o.gutierrez* > *--* *https://felipeogutierrez.blogspot.com > <https://felipeogutierrez.blogspot.com>* > > > On Tue, Jun 18, 2019 at 9:22 AM Piotr Nowojski <pi...@ververica.com> > wrote: > >> Hi, >> >> Isn’t your problem that the source is constantly emitting the data and >> bumping your timers? Keep in mind that the code that you are basing on has >> the following characteristic: >> >> > In the following example a KeyedProcessFunction maintains counts per >> key, and emits a key/count pair whenever a *minute passes without an >> update for that key* >> >> Piotrek >> >> On 17 Jun 2019, at 15:51, Felipe Gutierrez <felipe.o.gutier...@gmail.com> >> wrote: >> >> Hi, >> >> I used this example of KeyedProcessFunction from the FLink website [1] >> and I have implemented my own KeyedProcessFunction to process some >> approximation counting [2]. This worked very well. Then I switched the data >> source to consume strings from Twitter [3]. The data source is consuming >> the strings because I can see it when I debug. However, the time comparison >> is always different on the onTimer() method, and I never get the results of >> the window processing. I don't know the exact reason that this is >> happening. I guess it is because my state is too heavy. But, still >> shouldn't the time be correct at some point to finish the evaluation of my >> window? >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example >> [2] >> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowSocket.java >> [3] >> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java >> >> Kind Regards, >> Felipe >> *--* >> *-- Felipe Gutierrez* >> >> *-- skype: felipe.o.gutierrez* >> *--* *https://felipeogutierrez.blogspot.com >> <https://felipeogutierrez.blogspot.com/>* >> >> >>