[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352692#comment-16352692
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on the issue:

https://github.com/apache/flink/pull/5295
  
I can see it's gone through Travis and is now in master, so closing as 
requested


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Assignee: Dyana Rose
>Priority: Minor
> Fix For: 1.5.0
>
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352693#comment-16352693
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose closed the pull request at:

https://github.com/apache/flink/pull/5295


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Assignee: Dyana Rose
>Priority: Minor
> Fix For: 1.5.0
>
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352575#comment-16352575
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5295
  
Thanks a lot for working on this and iterating so quickly!  

I merged this but could you please close the PR if it doesn't close 
automatically?


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Assignee: Dyana Rose
>Priority: Minor
> Fix For: 1.5.0
>
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-02-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16351836#comment-16351836
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on the issue:

https://github.com/apache/flink/pull/5295
  
the change to return Time has been backed out, so extract returns a long 
again.

PublicEvolving annotations have been added to the new classes and methods.


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350872#comment-16350872
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on the issue:

https://github.com/apache/flink/pull/5295
  
erf, I see what you mean, as well as the creation of all those Time objects.


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350278#comment-16350278
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5295
  
I get that logic, but the existing `TimestampAssigner` also returns a 
`long` and if we return `Time` we always have to wrap/unwrap that long. What do 
you think?


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350138#comment-16350138
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on the issue:

https://github.com/apache/flink/pull/5295
  
I like long myself, but I think that's only because I'm quite used to 
working in milliseconds. As the existing static Session Windows take Time as 
the gap, I think it made sense to have the extract method also produce a time. 

If it returns a Time, we don't have to worry about an implementer getting 
confused about what time unit they need to be returning, or always having to 
look it up just to check that they're right.


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350119#comment-16350119
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5295
  
I think the changes are good! Thanks for working on this.  

As a final change before merging, I would annotate the new classes/methods 
as `@PublicEvolving`, would you be ok with that? And I would also like to 
change `SessionWindowTimeGapExtractor.extract()` to return a long instead of 
`Time`. What do you think?


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348692#comment-16348692
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on the issue:

https://github.com/apache/flink/pull/5295
  
the ci fail looks to be a known flaky test: 
FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347020#comment-16347020
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on the issue:

https://github.com/apache/flink/pull/5295
  
Ah, I hadn't thought to keep both in place. So unless the Dynamic 
SessionWindow classes had withDynamicGap made package private, you would then 
be able to instantiate them from two different classes. That could feel a bit 
iffy, however someone else would call it a convenience method. 

I'll get the change in for the Trigger cast, that should clean up the PR a 
fair bit


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346996#comment-16346996
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5295
  
Yes, that was me.  It was just a quick idea, and it would work because 
both `withGap()` and `withGapExtractor()` are static so the latter could have 
`T` on the method signature and return a `DynamicEventTimeSessionWindows` (or 
some such). I'm not against keeping it in the separate class, though.

I agree that the cast is a bit wonky but we know that it always works 
because the trigger we return never looks at the element.


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346972#comment-16346972
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on the issue:

https://github.com/apache/flink/pull/5295
  
I'm not the biggest fan of unchecked casts, but testing this in our POC 
environment casting the existing EventTimeTrigger to a typed Trigger is 
working. So if the unchecked cast is acceptable, that would get rid of all 
changes required to Event/ProcessingTimeTrigger

On your second point. (sorry if I mis-attribute this based on github 
profile name) I believe you had mentioned breaking this out into new classes 
when I first brought this up on the mailing list 
https://lists.apache.org/thread.html/6ceb094460bca8e9731e9e1dc0bb479f407f769458bff30c412adf78@%3Cdev.flink.apache.org%3E

Now that you see it in action, do you feel it would be better off as an 
addition to the existing session window classes?

The way I see it is, if I put withGapExtractor() on the existing classes, 
without adding type information to them, then the extract() method on 
SessionWindowGapExtractor will need to have the signature of extract(Object 
input) leaving the implementer to have to cast to the input type.

I have to admit it feels strange to me that the Window assigners all drop 
input type information. But that would mean that these new typed assigners 
would be the odd ones out.


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346635#comment-16346635
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5295
  
I like the functionality of this a lot! However, I don't like that we 
change the signature of existing triggers or that we introduce new triggers 
that duplicate existing code.

As an alternative, could you cast the `EventTimeTrigger` to `Trigger` 
in `getDefaultTrigger()` of your new assigner?

Also an additional idea, instead of putting the API method on 
`DynamicEventTimeSessionWindows` we could think about adding it to 
`EventTimeSessionWindows`. We would then have 
`EventTimeSessionWindows.withGap()` and 
`EventTimeSessionWindows.withGapExtractor()`. What do you think?


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16339014#comment-16339014
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r163796598
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

three sets of flink-streaming-scala tests needed to be updated to add 
existential type information due to adding the generic to the EventTimeTrigger 
and ProcessingTimeTrigger classes


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16338297#comment-16338297
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r163692223
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

changing EventTimeTrigger/ProcessingTimeTrigger would be my preference. 
However I was hesitant to just go in and change the class definition. I also 
don't believe it *should* break existing tests

I'll give it a go and see


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16338257#comment-16338257
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r163684364
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

Cant we change `EventTimeTrigger` to extends `Trigger` 
instead of `Trigger`? Don't see any reason why this would 
fail any existing code/test and it's also `@PublicEvolving`


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-20 Thread Dyana Rose (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16333190#comment-16333190
 ] 

Dyana Rose commented on FLINK-8384:
---

I rebased on top of master and pushed. The build's green now

> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16332687#comment-16332687
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/5295
  
It's `NoResourceAvailableException` error, not sure, but we can try to 
rebuild it. 


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16332480#comment-16332480
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on the issue:

https://github.com/apache/flink/pull/5295
  
looks like the build failed on: 
org.apache.flink.test.streaming.runtime.StreamTaskTimerITCase
testOperatorChainedToSource

I can't see why this change would cause that to fail, after the PR passed 
originally. However I can't see it on the flaky test list


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327322#comment-16327322
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161807828
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

The Typed requirement comes from the desire to allow the 
SessionWindowTimeGapExtractor to accept a correctly typed element.

To do that the Assigner itself needs to be typed, which means that the 
trigger needs to be typed and so on.

If the SessionWindowTimeGapExtractor extract method instead took `object`, 
requiring that the implementer cast it, then the new Typed classes wouldn't be 
necessary.

I don't find that to be the most user friendly interface though, when the 
type information is available. But, yeah, I'm not happy with having to 
implement these exact copy classes either...


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327323#comment-16327323
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161807837
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
+
+   protected SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor;
+
+   protected 
DynamicEventTimeSessionWindows(SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor) {
+   this.sessionWindowTimeGapExtractor = 
sessionWindowTimeGapExtractor;
+   }
+
+   @Override
+   public Collection assignWindows(T element, long timestamp, 
WindowAssignerContext context) {
+   long sessionTimeout = 
sessionWindowTimeGapExtractor.extract(element, timestamp, context);
+   if (sessionTimeout <= 0) {
+   throw new IllegalArgumentException("Dynamic session 
time gap must satisfy 0 < gap");
+   }
+   return Collections.singletonList(new TimeWindow(timestamp, 
timestamp + sessionTimeout));
+   }
+
+   @Override
+   public Trigger 
getDefaultTrigger(StreamExecutionEnvironment env) {
+   return TypedEventTimeTrigger.create();
--- End diff --

as above


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 

[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327325#comment-16327325
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161807869
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the current processing
+ * time. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   
keyed.window(DynamicProcessingTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicProcessingTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
+
+   protected SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor;
+
+   protected 
DynamicProcessingTimeSessionWindows(SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor) {
+   this.sessionWindowTimeGapExtractor = 
sessionWindowTimeGapExtractor;
+   }
+
+   @Override
+   public Collection assignWindows(T element, long timestamp, 
WindowAssignerContext context) {
+   long currentProcessingTime = context.getCurrentProcessingTime();
+   long sessionTimeout = 
sessionWindowTimeGapExtractor.extract(element, timestamp, context);
+   if (sessionTimeout <= 0) {
+   throw new IllegalArgumentException("Dynamic session 
time gap must satisfy 0 < gap");
+   }
+   return Collections.singletonList(new 
TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
+   }
+
+   @Override
+   public Trigger 
getDefaultTrigger(StreamExecutionEnvironment env) {
+   return TypedProcessingTimeTrigger.create();
--- End diff --

as above


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the 

[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327326#comment-16327326
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161807900
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import java.io.Serializable;
+
+/**
+ * A {@code SessionWindowTimeGapExtractor} extracts session time gaps for 
Dynamic Session Window Assigners.
+ *
+ * @param  The type of elements that this {@code 
SessionWindowTimeGapExtractor} can extract session time gaps from.
+ */
+public interface SessionWindowTimeGapExtractor extends Serializable {
+   long extract(T element, long timestamp, 
WindowAssigner.WindowAssignerContext context);
--- End diff --

can do.

Do you think it should return a Time instead of a long? To prevent anyone 
trying to return seconds/minutes/whatever instead of milliseconds?


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327324#comment-16327324
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161807850
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the current processing
+ * time. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   
keyed.window(DynamicProcessingTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicProcessingTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

as above


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327307#comment-16327307
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161799438
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the current processing
+ * time. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   
keyed.window(DynamicProcessingTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicProcessingTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
+
+   protected SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor;
+
+   protected 
DynamicProcessingTimeSessionWindows(SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor) {
+   this.sessionWindowTimeGapExtractor = 
sessionWindowTimeGapExtractor;
+   }
+
+   @Override
+   public Collection assignWindows(T element, long timestamp, 
WindowAssignerContext context) {
+   long currentProcessingTime = context.getCurrentProcessingTime();
+   long sessionTimeout = 
sessionWindowTimeGapExtractor.extract(element, timestamp, context);
+   if (sessionTimeout <= 0) {
+   throw new IllegalArgumentException("Dynamic session 
time gap must satisfy 0 < gap");
+   }
+   return Collections.singletonList(new 
TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
+   }
+
+   @Override
+   public Trigger 
getDefaultTrigger(StreamExecutionEnvironment env) {
+   return TypedProcessingTimeTrigger.create();
--- End diff --

DynamicProcessingTimeSessionWindows.


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> 

[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327308#comment-16327308
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161800866
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import java.io.Serializable;
+
+/**
+ * A {@code SessionWindowTimeGapExtractor} extracts session time gaps for 
Dynamic Session Window Assigners.
+ *
+ * @param  The type of elements that this {@code 
SessionWindowTimeGapExtractor} can extract session time gaps from.
+ */
+public interface SessionWindowTimeGapExtractor extends Serializable {
+   long extract(T element, long timestamp, 
WindowAssigner.WindowAssignerContext context);
--- End diff --

Can we remove the parameter of `timestamp` and  
`WindowAssigner.WindowAssignerContext context`?


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327310#comment-16327310
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161798960
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

Can we change `MergingWindowAssigner ` to 
`MergingWindowAssigner ` ? if so we can reuse the 
`EventTimeTrigger`.


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327309#comment-16327309
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161799107
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
+
+   protected SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor;
+
+   protected 
DynamicEventTimeSessionWindows(SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor) {
+   this.sessionWindowTimeGapExtractor = 
sessionWindowTimeGapExtractor;
+   }
+
+   @Override
+   public Collection assignWindows(T element, long timestamp, 
WindowAssignerContext context) {
+   long sessionTimeout = 
sessionWindowTimeGapExtractor.extract(element, timestamp, context);
+   if (sessionTimeout <= 0) {
+   throw new IllegalArgumentException("Dynamic session 
time gap must satisfy 0 < gap");
+   }
+   return Collections.singletonList(new TimeWindow(timestamp, 
timestamp + sessionTimeout));
+   }
+
+   @Override
+   public Trigger 
getDefaultTrigger(StreamExecutionEnvironment env) {
+   return TypedEventTimeTrigger.create();
--- End diff --

can we reuse the EventTimeTrigger?


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. 

[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16327306#comment-16327306
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161799387
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the current processing
+ * time. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   
keyed.window(DynamicProcessingTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicProcessingTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

Same as `DynamicEventTimeSessionWindows` comments.


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16325566#comment-16325566
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

GitHub user dyanarose opened a pull request:

https://github.com/apache/flink/pull/5295

[FLINK-8384] [streaming] Session Window Assigner with Dynamic Gaps

## What is the purpose of the change

This PR adds the ability for the Session Window assigners to to have 
dynamic inactivity gaps in addition to the existing static inactivity gaps.

**Behaviour of dynamic gaps within existing sessions:**
- scenario 1 - the new timeout is prior to the old timeout. The old timeout 
(the furthest in the future) is respected.
- scenario 2 - the new timeout is after the old timeout. The new timeout is 
respected.
- scenario 3 - a session is in flight, a new timeout is calculated, however 
no new events arrive for that session after the new timeout is calculated. This 
session will not have its timeout changed


## Brief change log

**What's New**
-  SessionWindowTimeGapExtractor\ - Generic Interface with one extract 
method that returns the time gap
- DynamicEventTimeSessionWindows\ - Generic event time session window
- DynamicProcessingTimeSessionWindows\ - Generic processing time 
session window
- TypedEventTimeTrigger\ - Generic event time trigger
- TypedProcessingTimeTrigger\ - Generic processing time trigger
- Tests for all the above

## Verifying this change

This change added tests and can be verified as follows:

 - added tests for the typed triggers that duplicate the existing trigger 
tests to prove parity
 - added unit tests for the dynamic session window assigners that mimic the 
existing static session window assigner tests to prove parity in the static case
 - added tests to the WindowOperatorTest class to prove the behaviour of 
changing inactivity gaps

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no, though the two typed trigger classes are marked 
`@Public(Evolving)`)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs && JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/SaleCycle/flink dynamic-session-window-gaps

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5295.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5295


commit 399522a3e23a51ce1e860e5e09499ef98a7e340d
Author: Dyana Rose 
Date:   2018-01-10T15:50:00Z

[FLINK-8384] [streaming] Dynamic Gap Session Window Assigner




> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-01-12 Thread Dyana Rose (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323724#comment-16323724
 ] 

Dyana Rose commented on FLINK-8384:
---

For preliminary review and comment while I get the documentation together: 
https://github.com/apache/flink/compare/master...SaleCycle:dynamic-session-window-gaps

*What's new:*
* SessionWindowTimeGapExtractor - Generic Interface with one extract method 
that returns the time gap
* DynamicEventTimeSessionWindows - Generic event time session window
* DynamicProcessingTimeSessionWindows - Generic processing time session 
window
* TypedEventTimeTrigger - Generic event time trigger
* TypedProcessingTimeTrigger - Generic processing time trigger
* Tests for all the above

In order to be able to type the elements passed to the 
SessionWindowTimeGapExtractor, I needed to implement a typed versions of both 
the EventTimeTrigger and the ProcessingTimeTrigger (which I've simply named 
TypedEventTimeTrigger and TypedProcessingTimeTrigger). Other than the generic 
added to the class declaration, these are both exactly the same as their 
non-generic counterparts (and so are their corresponding test files). I'm not 
terrifically happy about that, but felt it was the least invasive 
implementation for the existing code.

*Behaviour of dynamic gaps within existing sessions:*
* scenario 1 - the new timeout is prior to the old timeout. The old timeout 
(the furthest in the future) is respected.
* scenario 2 - the new timeout is after the old timeout. The new timeout is 
respected.
* scenario 3 - a session is in flight, a new timeout is calculated, however no 
new events arrive for that session after the new timeout is calculated. This 
session will not have its timeout changed

> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)