Re: Session windows - Evaluation of addition of element + window expires/gets discarded after some set time of inactivity

2016-10-31 Thread Aljoscha Krettek
Hi Anchit,
the timers don't necessarily have to be cleaned up. So you should be good
to go.

Cheers,
Aljoscha

On Fri, 28 Oct 2016 at 23:33 Anchit Jatana 
wrote:

> Hi Aljoscha,
>
> I am using the custom trigger with GlobalWindows window assigner. Do I
> still
> need to override clear method and delete the ProcessingTimeTimer using-
> triggerContext.deleteProcessingTimeTimer(prevTime)?
>
> Regards,
> Anchit
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Session-windows-Evaluation-of-addition-of-element-window-expires-gets-discarded-after-some-set-time-y-tp9665p9774.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Session windows - Evaluation of addition of element + window expires/gets discarded after some set time of inactivity

2016-10-28 Thread Anchit Jatana
Hi Aljoscha,

I am using the custom trigger with GlobalWindows window assigner. Do I still
need to override clear method and delete the ProcessingTimeTimer using-
triggerContext.deleteProcessingTimeTimer(prevTime)?

Regards,
Anchit



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Session-windows-Evaluation-of-addition-of-element-window-expires-gets-discarded-after-some-set-time-y-tp9665p9774.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Session windows - Evaluation of addition of element + window expires/gets discarded after some set time of inactivity

2016-10-25 Thread Aljoscha Krettek
Hi Bart,
are you using your custom Trigger together with a merging session window
assigner?

You might want to consider overriding the clear() method in your trigger to
clean up the state that you use. If you don't you might run into memory
leaks because the state is never cleaned up.

Cheers,
Aljoscha

On Sat, 22 Oct 2016 at 07:06 Anchit Jatana 
wrote:

> Hi Bart,
>
> Thank you so much for sharing the approach. Looks like this solved my
> problem. Here's what I have as an implementation for my use-case:
>
> package org.apache.flink.quickstart
>
> import org.apache.flink.api.common.state.{ ReducingState,
> ReducingStateDescriptor, ValueState, ValueStateDescriptor }
> import
> org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction
> import org.apache.flink.streaming.api.windowing.time.Time
> import
> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
> import org.apache.flink.streaming.api.windowing.triggers.{ Trigger,
> TriggerResult }
> import org.apache.flink.streaming.api.windowing.windows.Window
> import org.slf4j.LoggerFactory
>
> class sessionTrigger[E](val sessionPauseHours: Long) extends Trigger[E,
> Window] {
>
>   val timeState = new ValueStateDescriptor[Option[Long]]("sessionTimer",
> classOf[Option[Long]], None)
>
>   override def onElement(t: E, l: Long, w: Window, triggerContext:
> TriggerContext): TriggerResult = {
>
> // remove old timer
> val time_state: ValueState[Option[Long]] =
> triggerContext.getPartitionedState(timeState)
> val time_set = time_state.value()
> if (time_set.isDefined) {
>   triggerContext.deleteProcessingTimeTimer(time_set.get)
> }
> // set new time and continue
> val new_time = triggerContext.getCurrentProcessingTime +
> Time.seconds(sessionPauseHours).toMilliseconds()
> time_state.update(Some(new_time))
> triggerContext.registerProcessingTimeTimer(new_time)
> TriggerResult.FIRE
>   }
>
>   override def onProcessingTime(l: Long, w: Window, triggerContext:
> TriggerContext): TriggerResult = {
> TriggerResult.PURGE
>   }
>
>   override def onEventTime(l: Long, w: Window, triggerContext:
> TriggerContext): TriggerResult = {
> TriggerResult.CONTINUE
>   }
> }
>
> Regards,
> Anchit
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Session-windows-Evaluation-of-addition-of-element-window-expires-gets-discarded-after-some-set-time-y-tp9665p9676.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


RE: Session windows - Evaluation of addition of element + window expires/gets discarded after some set time of inactivity

2016-10-21 Thread Anchit Jatana
Hi Bart,

Thank you so much for sharing the approach. Looks like this solved my
problem. Here's what I have as an implementation for my use-case:

package org.apache.flink.quickstart

import org.apache.flink.api.common.state.{ ReducingState,
ReducingStateDescriptor, ValueState, ValueStateDescriptor }
import
org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction
import org.apache.flink.streaming.api.windowing.time.Time
import
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers.{ Trigger,
TriggerResult }
import org.apache.flink.streaming.api.windowing.windows.Window
import org.slf4j.LoggerFactory

class sessionTrigger[E](val sessionPauseHours: Long) extends Trigger[E,
Window] {

  val timeState = new ValueStateDescriptor[Option[Long]]("sessionTimer",
classOf[Option[Long]], None)

  override def onElement(t: E, l: Long, w: Window, triggerContext:
TriggerContext): TriggerResult = {

// remove old timer
val time_state: ValueState[Option[Long]] =
triggerContext.getPartitionedState(timeState)
val time_set = time_state.value()
if (time_set.isDefined) {
  triggerContext.deleteProcessingTimeTimer(time_set.get)
}
// set new time and continue
val new_time = triggerContext.getCurrentProcessingTime +
Time.seconds(sessionPauseHours).toMilliseconds()
time_state.update(Some(new_time))
triggerContext.registerProcessingTimeTimer(new_time)
TriggerResult.FIRE
  }

  override def onProcessingTime(l: Long, w: Window, triggerContext:
TriggerContext): TriggerResult = {
TriggerResult.PURGE
  }

  override def onEventTime(l: Long, w: Window, triggerContext:
TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
  }
}

Regards,
Anchit



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Session-windows-Evaluation-of-addition-of-element-window-expires-gets-discarded-after-some-set-time-y-tp9665p9676.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.