Hi guys, I want to sessionize this stream: 1,1,1,2,2,2,2,2,3,3,3,3,3,3,3,0,3,3,3,5, ... to these sessions:
1,1,1 2,2,2,2,2 3,3,3,3,3,3,3 0 3,3,3 5 I've wrote CustomTrigger to detect when stream elements change from 1 to 2 (2 to 3, 3 to 0 and so on) and then fire the trigger. But this is not the solution, because when I processing the first element of 2's, and fire the trigger the window will be [1,1,1,2] but I need to fire the trigger on the last element of 1's. Here is the pesudo of my onElement function in my custom trigger class: override def onElement(element: Session, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = { if (prevState == element.value) { prevState = element.value TriggerResult.CONTINUE } else { prevState = element.value TriggerResult.FIRE } } How can I solve this problem? -- Milād Khājavi http://blog.khajavi.ir Having the source means you can do it yourself. I tried to change the world, but I couldn’t find the source code.