Thanks Aljoscha, Looking forward to the 1.1. release. I managed to solve my problem using this example code:
https://bitbucket.org/snippets/vstoyak/o9Rqp (courtesy of Vladimir Stoyak) I had to create a custom window and window assigner. Hopefully that will help someone else. On Wed, Aug 3, 2016 at 8:35 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > a watermark cannot be sent before the element that makes you send that > watermark. A watermark of time T tells the system that no element will > arrive in the future with timestamp T or less, thus you cannot send it > before. It seems that what you are trying to achieve can be solved by using > session windows, which will be part of the upcoming 1.1 release: > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#session-windows > > Cheer, > Aljoscha > > On Wed, 3 Aug 2016 at 12:19 Jason Brelloch <jb.bc....@gmail.com> wrote: > >> A little more info. Here is a simplified version of my >> trigger: (windowConfiguration.timespan is the duration of the window) >> >> class CustomTrigger extends Trigger[QualifiedEvent, Window] { >> >> val stateTimeDescr = new >> ValueStateDescriptor[Long]("relevantTimestamp", classOf[Long], 0) >> >> override def onElement(event: QualifiedEvent, timestamp: Long, W: >> Window, ctx: TriggerContext): TriggerResult = { >> >> val relevantTimestamp = ctx.getPartitionedState(stateTimeDescr) >> val windowConfigurationState = >> ctx.getPartitionedState(windowConfigDescr) >> var windowConfiguration = windowConfigurationState.value() >> if(windowConfiguration == null) { >> windowConfigurationState.update(event.alertConfiguration.window.get) >> windowConfiguration = event.alertConfiguration.window.get >> } >> >> if(relevantTimestamp.value() == 0) { >> ctx.registerEventTimeTimer(event.event.created.toEpochMilli + >> windowConfiguration.timespan.toMillis) >> relevantTimestamp.update (event.event.created.toEpochMilli + >> windowConfiguration.timespan.toMillis) >> } >> >> TriggerResult.CONTINUE >> } >> >> override def onEventTime(timestamp: Long, W: Window, ctx: >> TriggerContext): TriggerResult = { >> TriggerResult.FIRE_AND_PURGE >> } >> >> override def onProcessingTime(timestamp: Long, W: Window, ctx: >> TriggerContext): TriggerResult = { >> TriggerResult.CONTINUE >> } >> } >> >> And here is the actual window execution: >> >> val stream = env.fromCollection(inputEvents) >> .assignAscendingTimestamps((e: QualifiedEvent) => { >> e.event.created.toEpochMilli }) >> .keyBy((e: QualifiedEvent) => { >> e.alertConfiguration.alertId.toString }) >> .window(GlobalWindows.create) >> .trigger(ConfigurableTrigger.create) >> .apply(new GrouperFunction).name("Grouper Function") >> >> Oddly enough when I do this with just a basic window function it works >> and I only get the two events I am supposed to: >> >> val stream = env.fromCollection(inputEvents) >> .assignAscendingTimestamps((e: QualifiedEvent) => { >> e.event.created.toEpochMilli }) >> .keyBy((e: QualifiedEvent) => { >> e.alertConfiguration.alertId.toString }) >> .timeWindow(Time.minutes(5)) >> .apply(new GrouperFunction).name("Grouper Function") >> >> >> On Wed, Aug 3, 2016 at 2:29 PM, Jason Brelloch <jb.bc....@gmail.com> >> wrote: >> >>> Hey guys, >>> >>> I am trying to use event time along with a custom window to capture a >>> subset of events. The problem I am running into is that it seems that >>> event that generates the timestamp/watermark arrives in the window before >>> the onEventTime() call is made that closes the window. Example: >>> >>> Window is supposed to capture 5 minutes of events after first event >>> arrives >>> Event 1: timestamp 12:01 - registers event timer for 12:06 >>> Event 2: timestamp 12:03 >>> Event 3: timestamp 12:20 - fires and purges window >>> >>> I get all three events in the window, instead of just the two the are >>> really within the 5 minute window. >>> >>> Is there someway to force the timestamp to arrive in the window before >>> the event that generated it? >>> >>> Thanks! >>> >>> -- >>> *Jason Brelloch* | Product Developer >>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 >>> <http://www.bettercloud.com/> >>> Subscribe to the BetterCloud Monitor >>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> >>> - >>> Get IT delivered to your inbox >>> >> >> >> >> -- >> *Jason Brelloch* | Product Developer >> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 >> <http://www.bettercloud.com/> >> Subscribe to the BetterCloud Monitor >> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> >> - >> Get IT delivered to your inbox >> > -- *Jason Brelloch* | Product Developer 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 <http://www.bettercloud.com/> Subscribe to the BetterCloud Monitor <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> - Get IT delivered to your inbox