[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352692#comment-16352692 ] ASF GitHub Bot commented on FLINK-8384: --- Github user dyanarose commented on the issue: https://github.com/apache/flink/pull/5295 I can see it's gone through Travis and is now in master, so closing as requested > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Assignee: Dyana Rose >Priority: Minor > Fix For: 1.5.0 > > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352693#comment-16352693 ] ASF GitHub Bot commented on FLINK-8384: --- Github user dyanarose closed the pull request at: https://github.com/apache/flink/pull/5295 > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Assignee: Dyana Rose >Priority: Minor > Fix For: 1.5.0 > > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352575#comment-16352575 ] ASF GitHub Bot commented on FLINK-8384: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5295 Thanks a lot for working on this and iterating so quickly! 👍 I merged this but could you please close the PR if it doesn't close automatically? > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Assignee: Dyana Rose >Priority: Minor > Fix For: 1.5.0 > > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351836#comment-16351836 ] ASF GitHub Bot commented on FLINK-8384: --- Github user dyanarose commented on the issue: https://github.com/apache/flink/pull/5295 the change to return Time has been backed out, so extract returns a long again. PublicEvolving annotations have been added to the new classes and methods. > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350872#comment-16350872 ] ASF GitHub Bot commented on FLINK-8384: --- Github user dyanarose commented on the issue: https://github.com/apache/flink/pull/5295 erf, I see what you mean, as well as the creation of all those Time objects. > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350278#comment-16350278 ] ASF GitHub Bot commented on FLINK-8384: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5295 I get that logic, but the existing `TimestampAssigner` also returns a `long` and if we return `Time` we always have to wrap/unwrap that long. What do you think? > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350138#comment-16350138 ] ASF GitHub Bot commented on FLINK-8384: --- Github user dyanarose commented on the issue: https://github.com/apache/flink/pull/5295 I like long myself, but I think that's only because I'm quite used to working in milliseconds. As the existing static Session Windows take Time as the gap, I think it made sense to have the extract method also produce a time. If it returns a Time, we don't have to worry about an implementer getting confused about what time unit they need to be returning, or always having to look it up just to check that they're right. > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16350119#comment-16350119 ] ASF GitHub Bot commented on FLINK-8384: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5295 I think the changes are good! Thanks for working on this. 👍 As a final change before merging, I would annotate the new classes/methods as `@PublicEvolving`, would you be ok with that? And I would also like to change `SessionWindowTimeGapExtractor.extract()` to return a long instead of `Time`. What do you think? > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348692#comment-16348692 ] ASF GitHub Bot commented on FLINK-8384: --- Github user dyanarose commented on the issue: https://github.com/apache/flink/pull/5295 the ci fail looks to be a known flaky test: FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16347020#comment-16347020 ] ASF GitHub Bot commented on FLINK-8384: --- Github user dyanarose commented on the issue: https://github.com/apache/flink/pull/5295 Ah, I hadn't thought to keep both in place. So unless the Dynamic SessionWindow classes had withDynamicGap made package private, you would then be able to instantiate them from two different classes. That could feel a bit iffy, however someone else would call it a convenience method. I'll get the change in for the Trigger cast, that should clean up the PR a fair bit > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346996#comment-16346996 ] ASF GitHub Bot commented on FLINK-8384: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5295 Yes, that was me. 😅 It was just a quick idea, and it would work because both `withGap()` and `withGapExtractor()` are static so the latter could have `T` on the method signature and return a `DynamicEventTimeSessionWindows` (or some such). I'm not against keeping it in the separate class, though. I agree that the cast is a bit wonky but we know that it always works because the trigger we return never looks at the element. > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346972#comment-16346972 ] ASF GitHub Bot commented on FLINK-8384: --- Github user dyanarose commented on the issue: https://github.com/apache/flink/pull/5295 I'm not the biggest fan of unchecked casts, but testing this in our POC environment casting the existing EventTimeTrigger to a typed Trigger is working. So if the unchecked cast is acceptable, that would get rid of all changes required to Event/ProcessingTimeTrigger On your second point. (sorry if I mis-attribute this based on github profile name) I believe you had mentioned breaking this out into new classes when I first brought this up on the mailing list https://lists.apache.org/thread.html/6ceb094460bca8e9731e9e1dc0bb479f407f769458bff30c412adf78@%3Cdev.flink.apache.org%3E Now that you see it in action, do you feel it would be better off as an addition to the existing session window classes? The way I see it is, if I put withGapExtractor() on the existing classes, without adding type information to them, then the extract() method on SessionWindowGapExtractor will need to have the signature of extract(Object input) leaving the implementer to have to cast to the input type. I have to admit it feels strange to me that the Window assigners all drop input type information. But that would mean that these new typed assigners would be the odd ones out. > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346635#comment-16346635 ] ASF GitHub Bot commented on FLINK-8384: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5295 I like the functionality of this a lot! However, I don't like that we change the signature of existing triggers or that we introduce new triggers that duplicate existing code. As an alternative, could you cast the `EventTimeTrigger` to `Trigger` in `getDefaultTrigger()` of your new assigner? Also an additional idea, instead of putting the API method on `DynamicEventTimeSessionWindows` we could think about adding it to `EventTimeSessionWindows`. We would then have `EventTimeSessionWindows.withGap()` and `EventTimeSessionWindows.withGapExtractor()`. What do you think? > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339014#comment-16339014 ] ASF GitHub Bot commented on FLINK-8384: --- Github user dyanarose commented on a diff in the pull request: https://github.com/apache/flink/pull/5295#discussion_r163796598 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.assigners; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +import java.util.Collection; +import java.util.Collections; + +/** + * A {@link WindowAssigner} that windows elements into sessions based on the timestamp of the + * elements. Windows cannot overlap. + * + * For example, in order to window into windows with a dynamic time gap: + * {@code + * DataStream> in = ...; + * KeyedStream> keyed = in.keyBy(...); + * WindowedStream, String, TimeWindows> windowed = + * keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link SessionWindowTimeGapExtractor })); + * } + * + * @param The type of the input elements + */ +public class DynamicEventTimeSessionWindows extends MergingWindowAssigner { + private static final long serialVersionUID = 1L; --- End diff -- three sets of flink-streaming-scala tests needed to be updated to add existential type information due to adding the generic to the EventTimeTrigger and ProcessingTimeTrigger classes > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16338297#comment-16338297 ] ASF GitHub Bot commented on FLINK-8384: --- Github user dyanarose commented on a diff in the pull request: https://github.com/apache/flink/pull/5295#discussion_r163692223 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.assigners; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +import java.util.Collection; +import java.util.Collections; + +/** + * A {@link WindowAssigner} that windows elements into sessions based on the timestamp of the + * elements. Windows cannot overlap. + * + * For example, in order to window into windows with a dynamic time gap: + * {@code + * DataStream> in = ...; + * KeyedStream> keyed = in.keyBy(...); + * WindowedStream, String, TimeWindows> windowed = + * keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link SessionWindowTimeGapExtractor })); + * } + * + * @param The type of the input elements + */ +public class DynamicEventTimeSessionWindows extends MergingWindowAssigner { + private static final long serialVersionUID = 1L; --- End diff -- changing EventTimeTrigger/ProcessingTimeTrigger would be my preference. However I was hesitant to just go in and change the class definition. I also don't believe it *should* break existing tests I'll give it a go and see > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16338257#comment-16338257 ] ASF GitHub Bot commented on FLINK-8384: --- Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5295#discussion_r163684364 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.assigners; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +import java.util.Collection; +import java.util.Collections; + +/** + * A {@link WindowAssigner} that windows elements into sessions based on the timestamp of the + * elements. Windows cannot overlap. + * + * For example, in order to window into windows with a dynamic time gap: + * {@code + * DataStream> in = ...; + * KeyedStream> keyed = in.keyBy(...); + * WindowedStream, String, TimeWindows> windowed = + * keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link SessionWindowTimeGapExtractor })); + * } + * + * @param The type of the input elements + */ +public class DynamicEventTimeSessionWindows extends MergingWindowAssigner { + private static final long serialVersionUID = 1L; --- End diff -- Cant we change `EventTimeTrigger` to extends `Trigger` instead of `Trigger`? Don't see any reason why this would fail any existing code/test and it's also `@PublicEvolving` > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16333190#comment-16333190 ] Dyana Rose commented on FLINK-8384: --- I rebased on top of master and pushed. The build's green now > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16332687#comment-16332687 ] ASF GitHub Bot commented on FLINK-8384: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/5295 It's `NoResourceAvailableException` error, not sure, but we can try to rebuild it. > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16332480#comment-16332480 ] ASF GitHub Bot commented on FLINK-8384: --- Github user dyanarose commented on the issue: https://github.com/apache/flink/pull/5295 looks like the build failed on: org.apache.flink.test.streaming.runtime.StreamTaskTimerITCase testOperatorChainedToSource I can't see why this change would cause that to fail, after the PR passed originally. However I can't see it on the flaky test list > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16327322#comment-16327322 ] ASF GitHub Bot commented on FLINK-8384: --- Github user dyanarose commented on a diff in the pull request: https://github.com/apache/flink/pull/5295#discussion_r161807828 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.assigners; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +import java.util.Collection; +import java.util.Collections; + +/** + * A {@link WindowAssigner} that windows elements into sessions based on the timestamp of the + * elements. Windows cannot overlap. + * + * For example, in order to window into windows with a dynamic time gap: + * {@code + * DataStream> in = ...; + * KeyedStream> keyed = in.keyBy(...); + * WindowedStream, String, TimeWindows> windowed = + * keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link SessionWindowTimeGapExtractor })); + * } + * + * @param The type of the input elements + */ +public class DynamicEventTimeSessionWindows extends MergingWindowAssigner { + private static final long serialVersionUID = 1L; --- End diff -- The Typed requirement comes from the desire to allow the SessionWindowTimeGapExtractor to accept a correctly typed element. To do that the Assigner itself needs to be typed, which means that the trigger needs to be typed and so on. If the SessionWindowTimeGapExtractor extract method instead took `object`, requiring that the implementer cast it, then the new Typed classes wouldn't be necessary. I don't find that to be the most user friendly interface though, when the type information is available. But, yeah, I'm not happy with having to implement these exact copy classes either... > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16327323#comment-16327323 ] ASF GitHub Bot commented on FLINK-8384: --- Github user dyanarose commented on a diff in the pull request: https://github.com/apache/flink/pull/5295#discussion_r161807837 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.assigners; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +import java.util.Collection; +import java.util.Collections; + +/** + * A {@link WindowAssigner} that windows elements into sessions based on the timestamp of the + * elements. Windows cannot overlap. + * + * For example, in order to window into windows with a dynamic time gap: + * {@code + * DataStream> in = ...; + * KeyedStream> keyed = in.keyBy(...); + * WindowedStream, String, TimeWindows> windowed = + * keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link SessionWindowTimeGapExtractor })); + * } + * + * @param The type of the input elements + */ +public class DynamicEventTimeSessionWindows extends MergingWindowAssigner { + private static final long serialVersionUID = 1L; + + protected SessionWindowTimeGapExtractor sessionWindowTimeGapExtractor; + + protected DynamicEventTimeSessionWindows(SessionWindowTimeGapExtractor sessionWindowTimeGapExtractor) { + this.sessionWindowTimeGapExtractor = sessionWindowTimeGapExtractor; + } + + @Override + public Collection assignWindows(T element, long timestamp, WindowAssignerContext context) { + long sessionTimeout = sessionWindowTimeGapExtractor.extract(element, timestamp, context); + if (sessionTimeout <= 0) { + throw new IllegalArgumentException("Dynamic session time gap must satisfy 0 < gap"); + } + return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout)); + } + + @Override + public Trigger getDefaultTrigger(StreamExecutionEnvironment env) { + return TypedEventTimeTrigger.create(); --- End diff -- as above > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should b
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16327325#comment-16327325 ] ASF GitHub Bot commented on FLINK-8384: --- Github user dyanarose commented on a diff in the pull request: https://github.com/apache/flink/pull/5295#discussion_r161807869 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.assigners; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TypedProcessingTimeTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +import java.util.Collection; +import java.util.Collections; + +/** + * A {@link WindowAssigner} that windows elements into sessions based on the current processing + * time. Windows cannot overlap. + * + * For example, in order to window into windows with a dynamic time gap: + * {@code + * DataStream> in = ...; + * KeyedStream> keyed = in.keyBy(...); + * WindowedStream, String, TimeWindows> windowed = + * keyed.window(DynamicProcessingTimeSessionWindows.withDynamicGap({@link SessionWindowTimeGapExtractor })); + * } + * + * @param The type of the input elements + */ +public class DynamicProcessingTimeSessionWindows extends MergingWindowAssigner { + private static final long serialVersionUID = 1L; + + protected SessionWindowTimeGapExtractor sessionWindowTimeGapExtractor; + + protected DynamicProcessingTimeSessionWindows(SessionWindowTimeGapExtractor sessionWindowTimeGapExtractor) { + this.sessionWindowTimeGapExtractor = sessionWindowTimeGapExtractor; + } + + @Override + public Collection assignWindows(T element, long timestamp, WindowAssignerContext context) { + long currentProcessingTime = context.getCurrentProcessingTime(); + long sessionTimeout = sessionWindowTimeGapExtractor.extract(element, timestamp, context); + if (sessionTimeout <= 0) { + throw new IllegalArgumentException("Dynamic session time gap must satisfy 0 < gap"); + } + return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout)); + } + + @Override + public Trigger getDefaultTrigger(StreamExecutionEnvironment env) { + return TypedProcessingTimeTrigger.create(); --- End diff -- as above > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined func
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16327326#comment-16327326 ] ASF GitHub Bot commented on FLINK-8384: --- Github user dyanarose commented on a diff in the pull request: https://github.com/apache/flink/pull/5295#discussion_r161807900 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.assigners; + +import java.io.Serializable; + +/** + * A {@code SessionWindowTimeGapExtractor} extracts session time gaps for Dynamic Session Window Assigners. + * + * @param The type of elements that this {@code SessionWindowTimeGapExtractor} can extract session time gaps from. + */ +public interface SessionWindowTimeGapExtractor extends Serializable { + long extract(T element, long timestamp, WindowAssigner.WindowAssignerContext context); --- End diff -- can do. Do you think it should return a Time instead of a long? To prevent anyone trying to return seconds/minutes/whatever instead of milliseconds? > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16327324#comment-16327324 ] ASF GitHub Bot commented on FLINK-8384: --- Github user dyanarose commented on a diff in the pull request: https://github.com/apache/flink/pull/5295#discussion_r161807850 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.assigners; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TypedProcessingTimeTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +import java.util.Collection; +import java.util.Collections; + +/** + * A {@link WindowAssigner} that windows elements into sessions based on the current processing + * time. Windows cannot overlap. + * + * For example, in order to window into windows with a dynamic time gap: + * {@code + * DataStream> in = ...; + * KeyedStream> keyed = in.keyBy(...); + * WindowedStream, String, TimeWindows> windowed = + * keyed.window(DynamicProcessingTimeSessionWindows.withDynamicGap({@link SessionWindowTimeGapExtractor })); + * } + * + * @param The type of the input elements + */ +public class DynamicProcessingTimeSessionWindows extends MergingWindowAssigner { + private static final long serialVersionUID = 1L; --- End diff -- as above > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16327307#comment-16327307 ] ASF GitHub Bot commented on FLINK-8384: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5295#discussion_r161799438 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.assigners; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TypedProcessingTimeTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +import java.util.Collection; +import java.util.Collections; + +/** + * A {@link WindowAssigner} that windows elements into sessions based on the current processing + * time. Windows cannot overlap. + * + * For example, in order to window into windows with a dynamic time gap: + * {@code + * DataStream> in = ...; + * KeyedStream> keyed = in.keyBy(...); + * WindowedStream, String, TimeWindows> windowed = + * keyed.window(DynamicProcessingTimeSessionWindows.withDynamicGap({@link SessionWindowTimeGapExtractor })); + * } + * + * @param The type of the input elements + */ +public class DynamicProcessingTimeSessionWindows extends MergingWindowAssigner { + private static final long serialVersionUID = 1L; + + protected SessionWindowTimeGapExtractor sessionWindowTimeGapExtractor; + + protected DynamicProcessingTimeSessionWindows(SessionWindowTimeGapExtractor sessionWindowTimeGapExtractor) { + this.sessionWindowTimeGapExtractor = sessionWindowTimeGapExtractor; + } + + @Override + public Collection assignWindows(T element, long timestamp, WindowAssignerContext context) { + long currentProcessingTime = context.getCurrentProcessingTime(); + long sessionTimeout = sessionWindowTimeGapExtractor.extract(element, timestamp, context); + if (sessionTimeout <= 0) { + throw new IllegalArgumentException("Dynamic session time gap must satisfy 0 < gap"); + } + return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout)); + } + + @Override + public Trigger getDefaultTrigger(StreamExecutionEnvironment env) { + return TypedProcessingTimeTrigger.create(); --- End diff -- DynamicProcessingTimeSessionWindows. > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16327308#comment-16327308 ] ASF GitHub Bot commented on FLINK-8384: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5295#discussion_r161800866 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java --- @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.assigners; + +import java.io.Serializable; + +/** + * A {@code SessionWindowTimeGapExtractor} extracts session time gaps for Dynamic Session Window Assigners. + * + * @param The type of elements that this {@code SessionWindowTimeGapExtractor} can extract session time gaps from. + */ +public interface SessionWindowTimeGapExtractor extends Serializable { + long extract(T element, long timestamp, WindowAssigner.WindowAssignerContext context); --- End diff -- Can we remove the parameter of `timestamp` and `WindowAssigner.WindowAssignerContext context`? > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16327310#comment-16327310 ] ASF GitHub Bot commented on FLINK-8384: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5295#discussion_r161798960 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.assigners; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +import java.util.Collection; +import java.util.Collections; + +/** + * A {@link WindowAssigner} that windows elements into sessions based on the timestamp of the + * elements. Windows cannot overlap. + * + * For example, in order to window into windows with a dynamic time gap: + * {@code + * DataStream> in = ...; + * KeyedStream> keyed = in.keyBy(...); + * WindowedStream, String, TimeWindows> windowed = + * keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link SessionWindowTimeGapExtractor })); + * } + * + * @param The type of the input elements + */ +public class DynamicEventTimeSessionWindows extends MergingWindowAssigner { + private static final long serialVersionUID = 1L; --- End diff -- Can we change `MergingWindowAssigner ` to `MergingWindowAssigner ` ? if so we can reuse the `EventTimeTrigger`. > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16327306#comment-16327306 ] ASF GitHub Bot commented on FLINK-8384: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5295#discussion_r161799387 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.assigners; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TypedProcessingTimeTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +import java.util.Collection; +import java.util.Collections; + +/** + * A {@link WindowAssigner} that windows elements into sessions based on the current processing + * time. Windows cannot overlap. + * + * For example, in order to window into windows with a dynamic time gap: + * {@code + * DataStream> in = ...; + * KeyedStream> keyed = in.keyBy(...); + * WindowedStream, String, TimeWindows> windowed = + * keyed.window(DynamicProcessingTimeSessionWindows.withDynamicGap({@link SessionWindowTimeGapExtractor })); + * } + * + * @param The type of the input elements + */ +public class DynamicProcessingTimeSessionWindows extends MergingWindowAssigner { + private static final long serialVersionUID = 1L; --- End diff -- Same as `DynamicEventTimeSessionWindows` comments. > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16327309#comment-16327309 ] ASF GitHub Bot commented on FLINK-8384: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5295#discussion_r161799107 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.assigners; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; + +import java.util.Collection; +import java.util.Collections; + +/** + * A {@link WindowAssigner} that windows elements into sessions based on the timestamp of the + * elements. Windows cannot overlap. + * + * For example, in order to window into windows with a dynamic time gap: + * {@code + * DataStream> in = ...; + * KeyedStream> keyed = in.keyBy(...); + * WindowedStream, String, TimeWindows> windowed = + * keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link SessionWindowTimeGapExtractor })); + * } + * + * @param The type of the input elements + */ +public class DynamicEventTimeSessionWindows extends MergingWindowAssigner { + private static final long serialVersionUID = 1L; + + protected SessionWindowTimeGapExtractor sessionWindowTimeGapExtractor; + + protected DynamicEventTimeSessionWindows(SessionWindowTimeGapExtractor sessionWindowTimeGapExtractor) { + this.sessionWindowTimeGapExtractor = sessionWindowTimeGapExtractor; + } + + @Override + public Collection assignWindows(T element, long timestamp, WindowAssignerContext context) { + long sessionTimeout = sessionWindowTimeGapExtractor.extract(element, timestamp, context); + if (sessionTimeout <= 0) { + throw new IllegalArgumentException("Dynamic session time gap must satisfy 0 < gap"); + } + return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout)); + } + + @Override + public Trigger getDefaultTrigger(StreamExecutionEnvironment env) { + return TypedEventTimeTrigger.create(); --- End diff -- can we reuse the EventTimeTrigger? > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynam
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16325566#comment-16325566 ] ASF GitHub Bot commented on FLINK-8384: --- GitHub user dyanarose opened a pull request: https://github.com/apache/flink/pull/5295 [FLINK-8384] [streaming] Session Window Assigner with Dynamic Gaps ## What is the purpose of the change This PR adds the ability for the Session Window assigners to to have dynamic inactivity gaps in addition to the existing static inactivity gaps. **Behaviour of dynamic gaps within existing sessions:** - scenario 1 - the new timeout is prior to the old timeout. The old timeout (the furthest in the future) is respected. - scenario 2 - the new timeout is after the old timeout. The new timeout is respected. - scenario 3 - a session is in flight, a new timeout is calculated, however no new events arrive for that session after the new timeout is calculated. This session will not have its timeout changed ## Brief change log **What's New** - SessionWindowTimeGapExtractor\ - Generic Interface with one extract method that returns the time gap - DynamicEventTimeSessionWindows\ - Generic event time session window - DynamicProcessingTimeSessionWindows\ - Generic processing time session window - TypedEventTimeTrigger\ - Generic event time trigger - TypedProcessingTimeTrigger\ - Generic processing time trigger - Tests for all the above ## Verifying this change This change added tests and can be verified as follows: - added tests for the typed triggers that duplicate the existing trigger tests to prove parity - added unit tests for the dynamic session window assigners that mimic the existing static session window assigner tests to prove parity in the static case - added tests to the WindowOperatorTest class to prove the behaviour of changing inactivity gaps ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no, though the two typed trigger classes are marked `@Public(Evolving)`) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs && JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/SaleCycle/flink dynamic-session-window-gaps Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5295.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5295 commit 399522a3e23a51ce1e860e5e09499ef98a7e340d Author: Dyana Rose Date: 2018-01-10T15:50:00Z [FLINK-8384] [streaming] Dynamic Gap Session Window Assigner > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps
[ https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16323724#comment-16323724 ] Dyana Rose commented on FLINK-8384: --- For preliminary review and comment while I get the documentation together: https://github.com/apache/flink/compare/master...SaleCycle:dynamic-session-window-gaps *What's new:* * SessionWindowTimeGapExtractor - Generic Interface with one extract method that returns the time gap * DynamicEventTimeSessionWindows - Generic event time session window * DynamicProcessingTimeSessionWindows - Generic processing time session window * TypedEventTimeTrigger - Generic event time trigger * TypedProcessingTimeTrigger - Generic processing time trigger * Tests for all the above In order to be able to type the elements passed to the SessionWindowTimeGapExtractor, I needed to implement a typed versions of both the EventTimeTrigger and the ProcessingTimeTrigger (which I've simply named TypedEventTimeTrigger and TypedProcessingTimeTrigger). Other than the generic added to the class declaration, these are both exactly the same as their non-generic counterparts (and so are their corresponding test files). I'm not terrifically happy about that, but felt it was the least invasive implementation for the existing code. *Behaviour of dynamic gaps within existing sessions:* * scenario 1 - the new timeout is prior to the old timeout. The old timeout (the furthest in the future) is respected. * scenario 2 - the new timeout is after the old timeout. The new timeout is respected. * scenario 3 - a session is in flight, a new timeout is calculated, however no new events arrive for that session after the new timeout is calculated. This session will not have its timeout changed > Session Window Assigner with Dynamic Gaps > - > > Key: FLINK-8384 > URL: https://issues.apache.org/jira/browse/FLINK-8384 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Dyana Rose >Priority: Minor > > *Reason for Improvement* > Currently both Session Window assigners only allow a static inactivity gap. > Given the following scenario, this is too restrictive: > * Given a stream of IoT events from many device types > * Assume each device type could have a different inactivity gap > * Assume each device type gap could change while sessions are in flight > By allowing dynamic inactivity gaps, the correct gap can be determined in the > [assignWindows > function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59] > by passing the element currently under consideration, the timestamp, and the > context to a user defined function. This eliminates the need to create > unwieldy work arounds if you only have static gaps. > Dynamic Session Window gaps should be available for both Event Time and > Processing Time streams. > (short preliminary discussion: > https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E) -- This message was sent by Atlassian JIRA (v6.4.14#64029)