Hi,
a watermark cannot be sent before the element that makes you send that
watermark. A watermark of time T tells the system that no element will
arrive in the future with timestamp T or less, thus you cannot send it
before. It seems that what you are trying to achieve can be solved by using
session windows, which will be part of the upcoming 1.1 release:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#session-windows

Cheer,
Aljoscha

On Wed, 3 Aug 2016 at 12:19 Jason Brelloch <jb.bc....@gmail.com> wrote:

> A little more info.  Here is a simplified version of my
> trigger: (windowConfiguration.timespan is the duration of the window)
>
> class CustomTrigger extends Trigger[QualifiedEvent, Window] {
>
>   val stateTimeDescr = new ValueStateDescriptor[Long]("relevantTimestamp",
> classOf[Long], 0)
>
>   override def onElement(event: QualifiedEvent, timestamp: Long, W:
> Window, ctx: TriggerContext): TriggerResult = {
>
>     val relevantTimestamp = ctx.getPartitionedState(stateTimeDescr)
>     val windowConfigurationState =
> ctx.getPartitionedState(windowConfigDescr)
>     var windowConfiguration = windowConfigurationState.value()
>     if(windowConfiguration == null) {
>       windowConfigurationState.update(event.alertConfiguration.window.get)
>       windowConfiguration = event.alertConfiguration.window.get
>     }
>
>     if(relevantTimestamp.value() == 0) {
>       ctx.registerEventTimeTimer(event.event.created.toEpochMilli +
> windowConfiguration.timespan.toMillis)
>       relevantTimestamp.update (event.event.created.toEpochMilli +
> windowConfiguration.timespan.toMillis)
>     }
>
>     TriggerResult.CONTINUE
>   }
>
>   override def onEventTime(timestamp: Long, W: Window, ctx:
> TriggerContext): TriggerResult = {
>     TriggerResult.FIRE_AND_PURGE
>   }
>
>   override def onProcessingTime(timestamp: Long, W: Window, ctx:
> TriggerContext): TriggerResult = {
>     TriggerResult.CONTINUE
>   }
> }
>
> And here is the actual window execution:
>
> val stream = env.fromCollection(inputEvents)
>         .assignAscendingTimestamps((e: QualifiedEvent) => {
> e.event.created.toEpochMilli })
>         .keyBy((e: QualifiedEvent) => {
> e.alertConfiguration.alertId.toString })
>         .window(GlobalWindows.create)
>         .trigger(ConfigurableTrigger.create)
>         .apply(new GrouperFunction).name("Grouper Function")
>
> Oddly enough when I do this with just a basic window function it works and
> I only get the two events I am supposed to:
>
> val stream = env.fromCollection(inputEvents)
>         .assignAscendingTimestamps((e: QualifiedEvent) => {
> e.event.created.toEpochMilli })
>         .keyBy((e: QualifiedEvent) => {
> e.alertConfiguration.alertId.toString })
>         .timeWindow(Time.minutes(5))
>         .apply(new GrouperFunction).name("Grouper Function")
>
>
> On Wed, Aug 3, 2016 at 2:29 PM, Jason Brelloch <jb.bc....@gmail.com>
> wrote:
>
>> Hey guys,
>>
>> I am trying to use event time along with a custom window to capture a
>> subset of events.  The problem I am running into is that it seems that
>> event that generates the timestamp/watermark arrives in the window before
>> the onEventTime() call is made that closes the window.  Example:
>>
>> Window is supposed to capture 5 minutes of events after first event
>> arrives
>> Event 1: timestamp 12:01 - registers event timer for 12:06
>> Event 2: timestamp 12:03
>> Event 3: timestamp 12:20 - fires and purges window
>>
>> I get all three events in the window, instead of just the two the are
>> really within the 5 minute window.
>>
>> Is there someway to force the timestamp to arrive in the window before
>> the event that generated it?
>>
>> Thanks!
>>
>> --
>> *Jason Brelloch* | Product Developer
>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
>> <http://www.bettercloud.com/>
>> Subscribe to the BetterCloud Monitor
>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
>>  -
>> Get IT delivered to your inbox
>>
>
>
>
> --
> *Jason Brelloch* | Product Developer
> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
> <http://www.bettercloud.com/>
> Subscribe to the BetterCloud Monitor
> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
>  -
> Get IT delivered to your inbox
>

Reply via email to