[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-02-05 Thread dyanarose
Github user dyanarose closed the pull request at:

https://github.com/apache/flink/pull/5295


---


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-25 Thread dyanarose
Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r163796598
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

three sets of flink-streaming-scala tests needed to be updated to add 
existential type information due to adding the generic to the EventTimeTrigger 
and ProcessingTimeTrigger classes


---


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-24 Thread dyanarose
Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r163692223
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

changing EventTimeTrigger/ProcessingTimeTrigger would be my preference. 
However I was hesitant to just go in and change the class definition. I also 
don't believe it *should* break existing tests

I'll give it a go and see


---


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-24 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r163684364
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

Cant we change `EventTimeTrigger` to extends `Trigger` 
instead of `Trigger`? Don't see any reason why this would 
fail any existing code/test and it's also `@PublicEvolving`


---


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-16 Thread dyanarose
Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161807900
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import java.io.Serializable;
+
+/**
+ * A {@code SessionWindowTimeGapExtractor} extracts session time gaps for 
Dynamic Session Window Assigners.
+ *
+ * @param  The type of elements that this {@code 
SessionWindowTimeGapExtractor} can extract session time gaps from.
+ */
+public interface SessionWindowTimeGapExtractor extends Serializable {
+   long extract(T element, long timestamp, 
WindowAssigner.WindowAssignerContext context);
--- End diff --

can do.

Do you think it should return a Time instead of a long? To prevent anyone 
trying to return seconds/minutes/whatever instead of milliseconds?


---


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-16 Thread dyanarose
Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161807869
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the current processing
+ * time. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   
keyed.window(DynamicProcessingTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicProcessingTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
+
+   protected SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor;
+
+   protected 
DynamicProcessingTimeSessionWindows(SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor) {
+   this.sessionWindowTimeGapExtractor = 
sessionWindowTimeGapExtractor;
+   }
+
+   @Override
+   public Collection assignWindows(T element, long timestamp, 
WindowAssignerContext context) {
+   long currentProcessingTime = context.getCurrentProcessingTime();
+   long sessionTimeout = 
sessionWindowTimeGapExtractor.extract(element, timestamp, context);
+   if (sessionTimeout <= 0) {
+   throw new IllegalArgumentException("Dynamic session 
time gap must satisfy 0 < gap");
+   }
+   return Collections.singletonList(new 
TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
+   }
+
+   @Override
+   public Trigger 
getDefaultTrigger(StreamExecutionEnvironment env) {
+   return TypedProcessingTimeTrigger.create();
--- End diff --

as above


---


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-16 Thread dyanarose
Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161807837
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
+
+   protected SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor;
+
+   protected 
DynamicEventTimeSessionWindows(SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor) {
+   this.sessionWindowTimeGapExtractor = 
sessionWindowTimeGapExtractor;
+   }
+
+   @Override
+   public Collection assignWindows(T element, long timestamp, 
WindowAssignerContext context) {
+   long sessionTimeout = 
sessionWindowTimeGapExtractor.extract(element, timestamp, context);
+   if (sessionTimeout <= 0) {
+   throw new IllegalArgumentException("Dynamic session 
time gap must satisfy 0 < gap");
+   }
+   return Collections.singletonList(new TimeWindow(timestamp, 
timestamp + sessionTimeout));
+   }
+
+   @Override
+   public Trigger 
getDefaultTrigger(StreamExecutionEnvironment env) {
+   return TypedEventTimeTrigger.create();
--- End diff --

as above


---


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-16 Thread dyanarose
Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161807850
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the current processing
+ * time. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   
keyed.window(DynamicProcessingTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicProcessingTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

as above


---


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-16 Thread dyanarose
Github user dyanarose commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161807828
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

The Typed requirement comes from the desire to allow the 
SessionWindowTimeGapExtractor to accept a correctly typed element.

To do that the Assigner itself needs to be typed, which means that the 
trigger needs to be typed and so on.

If the SessionWindowTimeGapExtractor extract method instead took `object`, 
requiring that the implementer cast it, then the new Typed classes wouldn't be 
necessary.

I don't find that to be the most user friendly interface though, when the 
type information is available. But, yeah, I'm not happy with having to 
implement these exact copy classes either...


---


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-16 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161798960
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

Can we change `MergingWindowAssigner ` to 
`MergingWindowAssigner ` ? if so we can reuse the 
`EventTimeTrigger`.


---


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-16 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161800866
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java
 ---
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import java.io.Serializable;
+
+/**
+ * A {@code SessionWindowTimeGapExtractor} extracts session time gaps for 
Dynamic Session Window Assigners.
+ *
+ * @param  The type of elements that this {@code 
SessionWindowTimeGapExtractor} can extract session time gaps from.
+ */
+public interface SessionWindowTimeGapExtractor extends Serializable {
+   long extract(T element, long timestamp, 
WindowAssigner.WindowAssignerContext context);
--- End diff --

Can we remove the parameter of `timestamp` and  
`WindowAssigner.WindowAssignerContext context`?


---


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-16 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161799107
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedEventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicEventTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
+
+   protected SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor;
+
+   protected 
DynamicEventTimeSessionWindows(SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor) {
+   this.sessionWindowTimeGapExtractor = 
sessionWindowTimeGapExtractor;
+   }
+
+   @Override
+   public Collection assignWindows(T element, long timestamp, 
WindowAssignerContext context) {
+   long sessionTimeout = 
sessionWindowTimeGapExtractor.extract(element, timestamp, context);
+   if (sessionTimeout <= 0) {
+   throw new IllegalArgumentException("Dynamic session 
time gap must satisfy 0 < gap");
+   }
+   return Collections.singletonList(new TimeWindow(timestamp, 
timestamp + sessionTimeout));
+   }
+
+   @Override
+   public Trigger 
getDefaultTrigger(StreamExecutionEnvironment env) {
+   return TypedEventTimeTrigger.create();
--- End diff --

can we reuse the EventTimeTrigger?


---


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-16 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161799438
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the current processing
+ * time. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   
keyed.window(DynamicProcessingTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicProcessingTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
+
+   protected SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor;
+
+   protected 
DynamicProcessingTimeSessionWindows(SessionWindowTimeGapExtractor 
sessionWindowTimeGapExtractor) {
+   this.sessionWindowTimeGapExtractor = 
sessionWindowTimeGapExtractor;
+   }
+
+   @Override
+   public Collection assignWindows(T element, long timestamp, 
WindowAssignerContext context) {
+   long currentProcessingTime = context.getCurrentProcessingTime();
+   long sessionTimeout = 
sessionWindowTimeGapExtractor.extract(element, timestamp, context);
+   if (sessionTimeout <= 0) {
+   throw new IllegalArgumentException("Dynamic session 
time gap must satisfy 0 < gap");
+   }
+   return Collections.singletonList(new 
TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
+   }
+
+   @Override
+   public Trigger 
getDefaultTrigger(StreamExecutionEnvironment env) {
+   return TypedProcessingTimeTrigger.create();
--- End diff --

DynamicProcessingTimeSessionWindows.


---


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-16 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5295#discussion_r161799387
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import 
org.apache.flink.streaming.api.windowing.triggers.TypedProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on 
the current processing
+ * time. Windows cannot overlap.
+ *
+ * For example, in order to window into windows with a dynamic time gap:
+ *  {@code
+ * DataStream> in = ...;
+ * KeyedStream> keyed = in.keyBy(...);
+ * WindowedStream, String, TimeWindows> windowed =
+ *   
keyed.window(DynamicProcessingTimeSessionWindows.withDynamicGap({@link 
SessionWindowTimeGapExtractor }));
+ * } 
+ *
+ * @param  The type of the input elements
+ */
+public class DynamicProcessingTimeSessionWindows extends 
MergingWindowAssigner {
+   private static final long serialVersionUID = 1L;
--- End diff --

Same as `DynamicEventTimeSessionWindows` comments.


---


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-01-14 Thread dyanarose
GitHub user dyanarose opened a pull request:

https://github.com/apache/flink/pull/5295

[FLINK-8384] [streaming] Session Window Assigner with Dynamic Gaps

## What is the purpose of the change

This PR adds the ability for the Session Window assigners to to have 
dynamic inactivity gaps in addition to the existing static inactivity gaps.

**Behaviour of dynamic gaps within existing sessions:**
- scenario 1 - the new timeout is prior to the old timeout. The old timeout 
(the furthest in the future) is respected.
- scenario 2 - the new timeout is after the old timeout. The new timeout is 
respected.
- scenario 3 - a session is in flight, a new timeout is calculated, however 
no new events arrive for that session after the new timeout is calculated. This 
session will not have its timeout changed


## Brief change log

**What's New**
-  SessionWindowTimeGapExtractor\ - Generic Interface with one extract 
method that returns the time gap
- DynamicEventTimeSessionWindows\ - Generic event time session window
- DynamicProcessingTimeSessionWindows\ - Generic processing time 
session window
- TypedEventTimeTrigger\ - Generic event time trigger
- TypedProcessingTimeTrigger\ - Generic processing time trigger
- Tests for all the above

## Verifying this change

This change added tests and can be verified as follows:

 - added tests for the typed triggers that duplicate the existing trigger 
tests to prove parity
 - added unit tests for the dynamic session window assigners that mimic the 
existing static session window assigner tests to prove parity in the static case
 - added tests to the WindowOperatorTest class to prove the behaviour of 
changing inactivity gaps

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no, though the two typed trigger classes are marked 
`@Public(Evolving)`)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs && JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/SaleCycle/flink dynamic-session-window-gaps

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5295.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5295


commit 399522a3e23a51ce1e860e5e09499ef98a7e340d
Author: Dyana Rose 
Date:   2018-01-10T15:50:00Z

[FLINK-8384] [streaming] Dynamic Gap Session Window Assigner




---