Thanks Aljoscha,

Looking forward to the 1.1. release.  I managed to solve my problem using
this example code:

https://bitbucket.org/snippets/vstoyak/o9Rqp
(courtesy of Vladimir Stoyak)

I had to create a custom window and window assigner.  Hopefully that will
help someone else.

On Wed, Aug 3, 2016 at 8:35 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> a watermark cannot be sent before the element that makes you send that
> watermark. A watermark of time T tells the system that no element will
> arrive in the future with timestamp T or less, thus you cannot send it
> before. It seems that what you are trying to achieve can be solved by using
> session windows, which will be part of the upcoming 1.1 release:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#session-windows
>
> Cheer,
> Aljoscha
>
> On Wed, 3 Aug 2016 at 12:19 Jason Brelloch <jb.bc....@gmail.com> wrote:
>
>> A little more info.  Here is a simplified version of my
>> trigger: (windowConfiguration.timespan is the duration of the window)
>>
>> class CustomTrigger extends Trigger[QualifiedEvent, Window] {
>>
>>   val stateTimeDescr = new
>> ValueStateDescriptor[Long]("relevantTimestamp", classOf[Long], 0)
>>
>>   override def onElement(event: QualifiedEvent, timestamp: Long, W:
>> Window, ctx: TriggerContext): TriggerResult = {
>>
>>     val relevantTimestamp = ctx.getPartitionedState(stateTimeDescr)
>>     val windowConfigurationState =
>> ctx.getPartitionedState(windowConfigDescr)
>>     var windowConfiguration = windowConfigurationState.value()
>>     if(windowConfiguration == null) {
>>       windowConfigurationState.update(event.alertConfiguration.window.get)
>>       windowConfiguration = event.alertConfiguration.window.get
>>     }
>>
>>     if(relevantTimestamp.value() == 0) {
>>       ctx.registerEventTimeTimer(event.event.created.toEpochMilli +
>> windowConfiguration.timespan.toMillis)
>>       relevantTimestamp.update (event.event.created.toEpochMilli +
>> windowConfiguration.timespan.toMillis)
>>     }
>>
>>     TriggerResult.CONTINUE
>>   }
>>
>>   override def onEventTime(timestamp: Long, W: Window, ctx:
>> TriggerContext): TriggerResult = {
>>     TriggerResult.FIRE_AND_PURGE
>>   }
>>
>>   override def onProcessingTime(timestamp: Long, W: Window, ctx:
>> TriggerContext): TriggerResult = {
>>     TriggerResult.CONTINUE
>>   }
>> }
>>
>> And here is the actual window execution:
>>
>> val stream = env.fromCollection(inputEvents)
>>         .assignAscendingTimestamps((e: QualifiedEvent) => {
>> e.event.created.toEpochMilli })
>>         .keyBy((e: QualifiedEvent) => {
>> e.alertConfiguration.alertId.toString })
>>         .window(GlobalWindows.create)
>>         .trigger(ConfigurableTrigger.create)
>>         .apply(new GrouperFunction).name("Grouper Function")
>>
>> Oddly enough when I do this with just a basic window function it works
>> and I only get the two events I am supposed to:
>>
>> val stream = env.fromCollection(inputEvents)
>>         .assignAscendingTimestamps((e: QualifiedEvent) => {
>> e.event.created.toEpochMilli })
>>         .keyBy((e: QualifiedEvent) => {
>> e.alertConfiguration.alertId.toString })
>>         .timeWindow(Time.minutes(5))
>>         .apply(new GrouperFunction).name("Grouper Function")
>>
>>
>> On Wed, Aug 3, 2016 at 2:29 PM, Jason Brelloch <jb.bc....@gmail.com>
>> wrote:
>>
>>> Hey guys,
>>>
>>> I am trying to use event time along with a custom window to capture a
>>> subset of events.  The problem I am running into is that it seems that
>>> event that generates the timestamp/watermark arrives in the window before
>>> the onEventTime() call is made that closes the window.  Example:
>>>
>>> Window is supposed to capture 5 minutes of events after first event
>>> arrives
>>> Event 1: timestamp 12:01 - registers event timer for 12:06
>>> Event 2: timestamp 12:03
>>> Event 3: timestamp 12:20 - fires and purges window
>>>
>>> I get all three events in the window, instead of just the two the are
>>> really within the 5 minute window.
>>>
>>> Is there someway to force the timestamp to arrive in the window before
>>> the event that generated it?
>>>
>>> Thanks!
>>>
>>> --
>>> *Jason Brelloch* | Product Developer
>>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
>>> <http://www.bettercloud.com/>
>>> Subscribe to the BetterCloud Monitor
>>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
>>>  -
>>> Get IT delivered to your inbox
>>>
>>
>>
>>
>> --
>> *Jason Brelloch* | Product Developer
>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
>> <http://www.bettercloud.com/>
>> Subscribe to the BetterCloud Monitor
>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
>>  -
>> Get IT delivered to your inbox
>>
>


-- 
*Jason Brelloch* | Product Developer
3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
<http://www.bettercloud.com/>
Subscribe to the BetterCloud Monitor
<https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
-
Get IT delivered to your inbox

Reply via email to