[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2021-04-27 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334303#comment-17334303
 ] 

Flink Jira Bot commented on FLINK-5047:
---

This issue was marked "stale-assigned" and has not received an update in 7 
days. It is now automatically unassigned. If you are still working on it, you 
can assign it to yourself again. Please also give an update about the status of 
the work.

> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Jark Wu
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2021-04-16 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17323694#comment-17323694
 ] 

Flink Jira Bot commented on FLINK-5047:
---

This issue is assigned but has not received an update in 7 days so it has been 
labeled "stale-assigned". If you are still working on the issue, please give an 
update and remove the label. If you are no longer working on the issue, please 
unassign so someone else may work on it. In 7 days the issue will be 
automatically unassigned.

> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: Jark Wu
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947950#comment-15947950
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3589#discussion_r108786995
  
--- Diff: docs/dev/table_api.md ---
@@ -1274,11 +1274,159 @@ A session window is defined by using the `Session` 
class as follows:
 
  Limitations
 
-Currently the following features are not supported yet:
+The following table summarizes available windows:
 
-- Row-count windows on event-time
-- Non-grouped session windows on batch tables
-- Sliding windows on batch tables
+
+  
+
+  Batch or streaming
+  Table API
+  Time
+  Time or count interval
+  Grouped or ungrouped
+  Supported?
+
+  
+
+  
+
+  Batch
+  Tumble
+  Event-time
+  Count
+  Grouped
+  Yes
+
+
+  Batch
+  Tumble
+  Event-time
+  Count
+  Ungrouped
+  No
+
+
+  Batch
+  Tumble
+  Event-time
+  Time
+  Both
+  Yes
+
+
+  Batch
+  Tumble
+  Processing-time
--- End diff --

I think we should remove the processing-time batch combination. Listing it 
in the table makes the API look incomplete even though this combination does 
not make sense and will never be supported.


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947951#comment-15947951
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3589#discussion_r108787775
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideCountWindowAggReduceGroupFunction.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+import java.util.{ArrayList => JArrayList}
+
+import org.apache.flink.api.common.functions.{CombineFunction, 
RichGroupReduceFunction}
--- End diff --

`CombineFunction` is not used


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947949#comment-15947949
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3589#discussion_r108794347
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
 ---
@@ -354,4 +354,94 @@ class DataSetWindowAggregateITCase(
 val results = windowedTable.toDataSet[Row].collect()
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverCountOverlappingFullPane(): Unit 
= {
+// please keep this test in sync with the DataStream processing-time 
variant
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val table = env
+  .fromCollection(data)
+  .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+val windowedTable = table
+  .window(Slide over 4.rows every 2.rows on 'long as 'w)
+  .groupBy('string, 'w)
+  .select('string, 'int.count)
--- End diff --

`count` does only verify the number of rows in a window but not which rows 
were aggregated. `sum` would be more meaningful, IMO.


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947948#comment-15947948
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3589#discussion_r108791475
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
 ---
@@ -354,4 +354,94 @@ class DataSetWindowAggregateITCase(
 val results = windowedTable.toDataSet[Row].collect()
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverCountOverlappingFullPane(): Unit 
= {
+// please keep this test in sync with the DataStream processing-time 
variant
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val table = env
+  .fromCollection(data)
+  .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+val windowedTable = table
+  .window(Slide over 4.rows every 2.rows on 'long as 'w)
+  .groupBy('string, 'w)
+  .select('string, 'int.count)
+
+val expected =
+  "Hello world,2\n" +
+  "Hello,2"
--- End diff --

shouldn't this be "Hello,4"? There are four rows with `'string = "Hello"` 
in the data set. Shouldn't those be aggregated in a sliding count window of 
size 4?


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947952#comment-15947952
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3589#discussion_r108792441
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
 ---
@@ -354,4 +354,94 @@ class DataSetWindowAggregateITCase(
 val results = windowedTable.toDataSet[Row].collect()
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverCountOverlappingFullPane(): Unit 
= {
+// please keep this test in sync with the DataStream processing-time 
variant
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val table = env
+  .fromCollection(data)
+  .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+val windowedTable = table
+  .window(Slide over 4.rows every 2.rows on 'long as 'w)
+  .groupBy('string, 'w)
+  .select('string, 'int.count)
+
+val expected =
+  "Hello world,2\n" +
+  "Hello,2"
+
+val results = windowedTable.toDataSet[Row].collect()
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverCountOverlappingSplitPane(): Unit 
= {
+// please keep this test in sync with the DataStream processing-time 
variant
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val table = env
+  .fromCollection(data)
+  .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+val windowedTable = table
+  .window(Slide over 6.rows every 1.rows on 'long as 'w)
+  .groupBy('w, 'string)
+  .select('string, 'int.count)
+
+val expected = "Hallo,1\n" +
+  "Hello world,1\n" +
+  "Hello world,2\n" +
+  "Hello,1\n" +
+  "Hello,2\n" +
+  "Hello,3\n" +
+  "Hi,1"
+
+val results = windowedTable.toDataSet[Row].collect()
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverCountNonOverlappingFullPane(): 
Unit = {
+// please keep this test in sync with the DataStream processing-time 
variant
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val table = env
+  .fromCollection(data)
+  .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+val windowedTable = table
+  .window(Slide over 2.rows every 4.rows on 'long as 'w)
+  .groupBy('string, 'w)
+  .select('string, 'int.count)
+
+val results = windowedTable.toDataSet[Row].collect()
+Assert.assertEquals(0, results.length)
--- End diff --

I think we should choose a data set that actually returns a result that can 
be checked. An empty result could be caused by many things.


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15934819#comment-15934819
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/3589

[FLINK-5047] [table] Add count sliding group-windows for batch tables

This PR adds the missing sliding group-windows for row intervals. I also 
added an additional table to the documentation to make it more transparent what 
is supported.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-5047_count

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3589.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3589


commit d7b03e9e2985ad4177ef31a0e8c16fbdc2696fb7
Author: twalthr 
Date:   2017-03-21T16:07:06Z

[FLINK-5047] [table] Add count sliding group-windows for batch tables




> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-08 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901480#comment-15901480
 ] 

Timo Walther commented on FLINK-5047:
-

Fixed for time-based windows in 1.3.0: 31a57c5a89d6d22ccb629c2adfe4ffb87441e6dd

> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901477#comment-15901477
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3364


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901463#comment-15901463
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104948259
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
 ---
@@ -53,7 +53,12 @@ class IncrementalAggregateAllWindowFunction[W <: Window](
 
 if (iterator.hasNext) {
   val record = iterator.next()
-  out.collect(record)
+  var i = 0
+  while (i < record.getArity) {
+output.setField(i, record.getField(0))
--- End diff --

Good point!


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901344#comment-15901344
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104925507
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -186,6 +200,130 @@ object AggregateUtil {
   }
 
   /**
+* Create a 
[[org.apache.flink.api.common.functions.GroupReduceFunction]] that prepares for
+* partial aggregates of sliding windows (time and count-windows).
+* It requires a prepared input (with intermediate aggregate fields and 
aligned rowtime for
+* pre-tumbling in case of time-windows), pre-aggregates (pre-tumbles) 
rows, aligns the
+* window-start, and replicates or omits records for different panes of 
a sliding window.
+*
+* The output of the function contains the grouping keys, the 
intermediate aggregate values of
+* all aggregate function and the aligned window start. Window start 
must not be a timestamp,
+* but can also be a count value for count-windows.
+*
+* The output is stored in Row by the following format:
+*
+* {{{
+*  avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+*|  |
+*v  v
+*
+-+-+++++-+
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 | 
windowStart |
+*
+-+-+++++-+
+*  ^ ^
+*  | |
+* sum(y) aggOffsetInRow = 4window 
start for pane mapping
+* }}}
+*
+* NOTE: this function is only used for sliding windows with partial 
aggregates on batch tables.
+*/
+  def createDataSetSlideWindowPrepareGroupReduceFunction(
+  window: LogicalWindow,
+  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+  groupings: Array[Int],
+  inputType: RelDataType,
+  isParserCaseSensitive: Boolean)
+: RichGroupReduceFunction[Row, Row] = {
+
+val aggregates = transformToAggregateFunctions(
+  namedAggregates.map(_.getKey),
+  inputType,
+  needRetraction = false)._2
+
+val returnType: RowTypeInfo = createDataSetAggregateBufferDataType(
+  groupings,
+  aggregates,
+  inputType,
+  Some(Array(BasicTypeInfo.LONG_TYPE_INFO)))
+
+window match {
+  case EventTimeSlidingGroupWindow(_, _, size, slide) if 
isTimeInterval(size.resultType) =>
+// sliding time-window
+// for partial aggregations
+new DataSetSlideTimeWindowAggReduceCombineFunction(
+  aggregates,
+  groupings.length,
+  returnType.getArity - 1,
+  asLong(size),
+  asLong(slide),
+  returnType)
+
+  case _ =>
+throw new UnsupportedOperationException(s"$window is currently not 
supported on batch.")
+}
+  }
+
+  /**
+* Create a [[org.apache.flink.api.common.functions.FlatMapFunction]] 
that prepares for
+* non-incremental aggregates of sliding windows (time-windows).
+*
+* It requires a prepared input (with intermediate aggregate fields), 
aligns the
+* window-start, and replicates or omits records for different panes of 
a sliding window.
+*
+* The output of the function contains the grouping keys, the 
intermediate aggregate values of
+* all aggregate function and the aligned window start.
+*
+* The output is stored in Row by the following format:
+*
+* {{{
+*  avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+*|  |
+*v  v
+*
+-+-+++++-+
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 | 
windowStart |
+*
+-+-+++++-+
+*  ^ ^
+*  | |
+* sum(y) 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901343#comment-15901343
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104919295
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
 ---
@@ -312,6 +320,108 @@ class DataSetWindowAggregate(
 }
   }
 
+  private def createEventTimeSlidingWindowDataSet(
+  inputDS: DataSet[Row],
+  isTimeWindow: Boolean,
+  size: Long,
+  slide: Long,
+  isParserCaseSensitive: Boolean)
+: DataSet[Row] = {
+
+// create MapFunction for initializing the aggregations
+// it aligns the rowtime for pre-tumbling in case of a time-window for 
partial aggregates
+val mapFunction = createDataSetWindowPrepareMapFunction(
+  window,
+  namedAggregates,
+  grouping,
+  inputType,
+  isParserCaseSensitive)
+
+val mappedDataSet = inputDS
+  .map(mapFunction)
+  .name(prepareOperatorName)
+
+val mapReturnType = mappedDataSet.getType
+
+val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+val groupingKeys = grouping.indices.toArray
+
+// do partial aggregation if possible
+val isPartial = doAllSupportPartialMerge(
+  namedAggregates.map(_.getKey),
+  inputType,
+  grouping.length)
+
+// only pre-tumble if it is worth it
+val littleTumblingSize = determineLargestTumblingSize(size, slide) <= 1
--- End diff --

`isLittleTumblingSize`


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901342#comment-15901342
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104926612
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceCombineFunction.scala
 ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.CombineFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+
+/**
+  * Wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
+  * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
+  *
+  * It is used for sliding on batch for time-windows.
+  *
+  * @param aggregates aggregate functions
+  * @param groupingKeysLength number of grouping keys
+  * @param timeFieldPos position of aligned time field
+  * @param windowSize window size of the sliding window
+  * @param windowSlide window slide of the sliding window
+  * @param returnType return type of this function
+  */
+class DataSetSlideTimeWindowAggReduceCombineFunction(
--- End diff --

Can be merged with `DataSetSlideTimeWindowAggReduceGroupFunction` because 
both can only be used if input is combinable.


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901337#comment-15901337
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104925837
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -186,6 +200,130 @@ object AggregateUtil {
   }
 
   /**
+* Create a 
[[org.apache.flink.api.common.functions.GroupReduceFunction]] that prepares for
+* partial aggregates of sliding windows (time and count-windows).
+* It requires a prepared input (with intermediate aggregate fields and 
aligned rowtime for
+* pre-tumbling in case of time-windows), pre-aggregates (pre-tumbles) 
rows, aligns the
+* window-start, and replicates or omits records for different panes of 
a sliding window.
+*
+* The output of the function contains the grouping keys, the 
intermediate aggregate values of
+* all aggregate function and the aligned window start. Window start 
must not be a timestamp,
+* but can also be a count value for count-windows.
+*
+* The output is stored in Row by the following format:
+*
+* {{{
+*  avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+*|  |
+*v  v
+*
+-+-+++++-+
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 | 
windowStart |
+*
+-+-+++++-+
+*  ^ ^
+*  | |
+* sum(y) aggOffsetInRow = 4window 
start for pane mapping
+* }}}
+*
+* NOTE: this function is only used for sliding windows with partial 
aggregates on batch tables.
+*/
+  def createDataSetSlideWindowPrepareGroupReduceFunction(
+  window: LogicalWindow,
+  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+  groupings: Array[Int],
+  inputType: RelDataType,
+  isParserCaseSensitive: Boolean)
+: RichGroupReduceFunction[Row, Row] = {
+
+val aggregates = transformToAggregateFunctions(
+  namedAggregates.map(_.getKey),
+  inputType,
+  needRetraction = false)._2
+
+val returnType: RowTypeInfo = createDataSetAggregateBufferDataType(
+  groupings,
+  aggregates,
+  inputType,
+  Some(Array(BasicTypeInfo.LONG_TYPE_INFO)))
+
+window match {
+  case EventTimeSlidingGroupWindow(_, _, size, slide) if 
isTimeInterval(size.resultType) =>
+// sliding time-window
+// for partial aggregations
+new DataSetSlideTimeWindowAggReduceCombineFunction(
+  aggregates,
+  groupings.length,
+  returnType.getArity - 1,
+  asLong(size),
+  asLong(slide),
+  returnType)
+
+  case _ =>
+throw new UnsupportedOperationException(s"$window is currently not 
supported on batch.")
+}
+  }
+
+  /**
+* Create a [[org.apache.flink.api.common.functions.FlatMapFunction]] 
that prepares for
+* non-incremental aggregates of sliding windows (time-windows).
+*
+* It requires a prepared input (with intermediate aggregate fields), 
aligns the
+* window-start, and replicates or omits records for different panes of 
a sliding window.
+*
+* The output of the function contains the grouping keys, the 
intermediate aggregate values of
+* all aggregate function and the aligned window start.
+*
+* The output is stored in Row by the following format:
+*
+* {{{
+*  avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+*|  |
+*v  v
+*
+-+-+++++-+
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 | 
windowStart |
+*
+-+-+++++-+
+*  ^ ^
+*  | |
+* sum(y) 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901345#comment-15901345
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104922073
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+
+/**
+  * It is used for sliding windows on batch for time-windows. It takes a 
prepared input row,
+  * aligns the window start, and replicates or omits records for different 
panes of a sliding
+  * window. It is used for non-partial aggregations.
+  *
+  * @param aggregatesLength number of aggregate functions
+  * @param groupingKeysLength number of grouping keys
+  * @param windowSize window size of the sliding window
+  * @param windowSlide window slide of the sliding window
+  * @param returnType return type of this function
+  */
+class DataSetSlideTimeWindowAggFlatMapFunction(
+private val aggregatesLength: Int,
+private val groupingKeysLength: Int,
+private val timeFieldPos: Int,
+private val windowSize: Long,
+private val windowSlide: Long,
+@transient private val returnType: TypeInformation[Row])
+  extends RichFlatMapFunction[Row, Row]
+  with ResultTypeQueryable[Row] {
+
+  Preconditions.checkNotNull(aggregatesLength)
+
+  private var intermediateRow: Row = _
+  // add one field to store window start
+  private val intermediateRowArity: Int = groupingKeysLength + 
aggregatesLength + 1
--- End diff --

can be removed


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901346#comment-15901346
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104921975
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+
+/**
+  * It is used for sliding windows on batch for time-windows. It takes a 
prepared input row,
+  * aligns the window start, and replicates or omits records for different 
panes of a sliding
+  * window. It is used for non-partial aggregations.
+  *
+  * @param aggregatesLength number of aggregate functions
+  * @param groupingKeysLength number of grouping keys
+  * @param windowSize window size of the sliding window
+  * @param windowSlide window slide of the sliding window
+  * @param returnType return type of this function
+  */
+class DataSetSlideTimeWindowAggFlatMapFunction(
+private val aggregatesLength: Int,
+private val groupingKeysLength: Int,
+private val timeFieldPos: Int,
+private val windowSize: Long,
+private val windowSlide: Long,
+@transient private val returnType: TypeInformation[Row])
+  extends RichFlatMapFunction[Row, Row]
+  with ResultTypeQueryable[Row] {
+
+  Preconditions.checkNotNull(aggregatesLength)
+
+  private var intermediateRow: Row = _
--- End diff --

can be removed because we are replicating the input record.


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901339#comment-15901339
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104927085
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala
 ---
@@ -112,12 +112,10 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
 }
 
 // get final aggregate value and set to output.
-aggregateMapping.foreach {
-  case (after, previous) => {
+aggregateMapping.foreach { case (after, previous) =>
--- End diff --

revert this change? Will be fixed with #3489 


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901340#comment-15901340
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104925916
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -186,6 +200,130 @@ object AggregateUtil {
   }
 
   /**
+* Create a 
[[org.apache.flink.api.common.functions.GroupReduceFunction]] that prepares for
+* partial aggregates of sliding windows (time and count-windows).
+* It requires a prepared input (with intermediate aggregate fields and 
aligned rowtime for
+* pre-tumbling in case of time-windows), pre-aggregates (pre-tumbles) 
rows, aligns the
+* window-start, and replicates or omits records for different panes of 
a sliding window.
+*
+* The output of the function contains the grouping keys, the 
intermediate aggregate values of
+* all aggregate function and the aligned window start. Window start 
must not be a timestamp,
+* but can also be a count value for count-windows.
+*
+* The output is stored in Row by the following format:
+*
+* {{{
+*  avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+*|  |
+*v  v
+*
+-+-+++++-+
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 | 
windowStart |
+*
+-+-+++++-+
+*  ^ ^
+*  | |
+* sum(y) aggOffsetInRow = 4window 
start for pane mapping
+* }}}
+*
+* NOTE: this function is only used for sliding windows with partial 
aggregates on batch tables.
+*/
+  def createDataSetSlideWindowPrepareGroupReduceFunction(
+  window: LogicalWindow,
+  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+  groupings: Array[Int],
+  inputType: RelDataType,
+  isParserCaseSensitive: Boolean)
+: RichGroupReduceFunction[Row, Row] = {
+
+val aggregates = transformToAggregateFunctions(
+  namedAggregates.map(_.getKey),
+  inputType,
+  needRetraction = false)._2
+
+val returnType: RowTypeInfo = createDataSetAggregateBufferDataType(
+  groupings,
+  aggregates,
+  inputType,
+  Some(Array(BasicTypeInfo.LONG_TYPE_INFO)))
+
+window match {
+  case EventTimeSlidingGroupWindow(_, _, size, slide) if 
isTimeInterval(size.resultType) =>
+// sliding time-window
+// for partial aggregations
+new DataSetSlideTimeWindowAggReduceCombineFunction(
+  aggregates,
+  groupings.length,
+  returnType.getArity - 1,
+  asLong(size),
+  asLong(slide),
+  returnType)
+
+  case _ =>
+throw new UnsupportedOperationException(s"$window is currently not 
supported on batch.")
+}
+  }
+
+  /**
+* Create a [[org.apache.flink.api.common.functions.FlatMapFunction]] 
that prepares for
+* non-incremental aggregates of sliding windows (time-windows).
+*
+* It requires a prepared input (with intermediate aggregate fields), 
aligns the
+* window-start, and replicates or omits records for different panes of 
a sliding window.
+*
+* The output of the function contains the grouping keys, the 
intermediate aggregate values of
+* all aggregate function and the aligned window start.
+*
+* The output is stored in Row by the following format:
+*
+* {{{
+*  avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+*|  |
+*v  v
+*
+-+-+++++-+
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 | 
windowStart |
+*
+-+-+++++-+
+*  ^ ^
+*  | |
+* sum(y) 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901341#comment-15901341
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104927549
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
 ---
@@ -53,7 +53,12 @@ class IncrementalAggregateAllWindowFunction[W <: Window](
 
 if (iterator.hasNext) {
   val record = iterator.next()
-  out.collect(record)
+  var i = 0
+  while (i < record.getArity) {
+output.setField(i, record.getField(0))
--- End diff --

`record.getField(0)` -> `record.getField(i)`?


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901338#comment-15901338
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104925166
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+
+/**
+  * It is used for sliding windows on batch for time-windows. It takes a 
prepared input row,
+  * aligns the window start, and replicates or omits records for different 
panes of a sliding
+  * window. It is used for non-partial aggregations.
+  *
+  * @param aggregatesLength number of aggregate functions
+  * @param groupingKeysLength number of grouping keys
+  * @param windowSize window size of the sliding window
+  * @param windowSlide window slide of the sliding window
+  * @param returnType return type of this function
+  */
+class DataSetSlideTimeWindowAggFlatMapFunction(
+private val aggregatesLength: Int,
--- End diff --

`aggregatesLength` and `groupingKeysLength` can be removed


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901347#comment-15901347
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104922022
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+
+/**
+  * It is used for sliding windows on batch for time-windows. It takes a 
prepared input row,
+  * aligns the window start, and replicates or omits records for different 
panes of a sliding
+  * window. It is used for non-partial aggregations.
+  *
+  * @param aggregatesLength number of aggregate functions
+  * @param groupingKeysLength number of grouping keys
+  * @param windowSize window size of the sliding window
+  * @param windowSlide window slide of the sliding window
+  * @param returnType return type of this function
+  */
+class DataSetSlideTimeWindowAggFlatMapFunction(
+private val aggregatesLength: Int,
+private val groupingKeysLength: Int,
+private val timeFieldPos: Int,
+private val windowSize: Long,
+private val windowSlide: Long,
+@transient private val returnType: TypeInformation[Row])
+  extends RichFlatMapFunction[Row, Row]
+  with ResultTypeQueryable[Row] {
+
+  Preconditions.checkNotNull(aggregatesLength)
+
+  private var intermediateRow: Row = _
+  // add one field to store window start
+  private val intermediateRowArity: Int = groupingKeysLength + 
aggregatesLength + 1
+
+  override def open(config: Configuration) {
--- End diff --

`open()` can be removed


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901200#comment-15901200
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/3364
  
Thanks for the feedback @fhueske. I updated the PR.


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899803#comment-15899803
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104726234
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.CombineFunction
+import org.apache.flink.types.Row
+
+/**
+  * Wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
+  * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
+  *
+  * It is used for sliding on batch for both time and count-windows.
+  *
+  * @param aggregates aggregate functions.
+  * @param groupKeysMapping index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  * @param aggregateMapping index mapping between aggregate function list 
and aggregated value
+  * index in output Row.
+  * @param intermediateRowArity intermediate row field count
+  * @param finalRowArity output row field count
+  * @param finalRowWindowStartPos relative window-start position to last 
field of output row
+  * @param finalRowWindowEndPos relative window-end position to last field 
of output row
+  * @param windowSize size of the window, used to determine window-end for 
output row
+  */
+class DataSetSlideWindowAggReduceCombineFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+groupKeysMapping: Array[(Int, Int)],
+aggregateMapping: Array[(Int, Int)],
+intermediateRowArity: Int,
+finalRowArity: Int,
+finalRowWindowStartPos: Option[Int],
+finalRowWindowEndPos: Option[Int],
+windowSize: Long)
+  extends DataSetSlideWindowAggReduceGroupFunction(
+aggregates,
+groupKeysMapping,
+aggregateMapping,
+intermediateRowArity,
+finalRowArity,
+finalRowWindowStartPos,
+finalRowWindowEndPos,
+windowSize)
+  with CombineFunction[Row, Row] {
+
+  override def combine(records: Iterable[Row]): Row = {
+// initiate intermediate aggregate value
+aggregates.foreach(_.initiate(aggregateBuffer))
+
+val iterator = records.iterator()
+while (iterator.hasNext) {
+  val record = iterator.next()
+  aggregates.foreach(_.merge(record, aggregateBuffer))
+
+  // check if this record is the last record
+  if (!iterator.hasNext) {
+// set group keys to aggregateBuffer
+for (i <- groupKeysMapping.indices) {
+  aggregateBuffer.setField(i, record.getField(i))
+}
+
+aggregateBuffer.setField(windowStartFieldPos, 
record.getField(windowStartFieldPos))
+
+return aggregateBuffer
+  }
+}
+
+// this code path should never be reached as we return before the loop 
finishes
+throw new IllegalArgumentException("Group is empty. This should never 
happen.")
--- End diff --

OK, but how about we make that more clear with a comment like:

`// This will never happen because the iterator is never null but we have 
to satisfy the compiler.`


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899795#comment-15899795
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104725017
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideCountWindowAggReduceGroupFunction.scala
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+import java.sql.Timestamp
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+  * It is used for sliding windows on batch for count-windows. It takes a 
prepared input row,
+  * pre-aggregates (pre-tumbles) rows, aligns the window start, and 
replicates or omits records
+  * for different panes of a sliding window.
+  *
+  * @param aggregates aggregate functions
+  * @param groupingKeysLength number of grouping keys
+  * @param preTumblingSize number of records to be aggregated (tumbled) 
before emission
+  * @param windowSize window size of the sliding window
+  * @param windowSlide window slide of the sliding window
+  * @param returnType return type of this function
+  */
+class DataSetSlideCountWindowAggReduceGroupFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupingKeysLength: Int,
+private val preTumblingSize: Long,
+private val windowSize: Long,
+private val windowSlide: Long,
+@transient private val returnType: TypeInformation[Row])
+  extends RichGroupReduceFunction[Row, Row]
+  with ResultTypeQueryable[Row] {
+
+  private var output: Row = _
+  private var outWindowStartIndex: Int = _
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+// add one field to store window start count
+val partialRowLength = groupingKeysLength +
+  aggregates.map(_.intermediateDataType.length).sum + 1
+output = new Row(partialRowLength)
+outWindowStartIndex = partialRowLength - 1
+  }
+
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
+var count: Long = 0
+
+val iterator = records.iterator()
+
+while (iterator.hasNext) {
+  val record = iterator.next()
+  // reset aggregates after completed tumbling
+  if (count % preTumblingSize == 0) {
+// initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(output))
+  }
+
+  // merge intermediate aggregate value to buffer.
+  aggregates.foreach(_.merge(record, output))
+
+  count += 1
+
+  // trigger tumbling evaluation
+  if (count % preTumblingSize == 0) {
--- End diff --

But shouldn't batch be the reference for streaming?


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899782#comment-15899782
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104722693
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
 ---
@@ -360,6 +360,19 @@ class GroupWindowTest extends TableTestBase {
   .window(Session withGap 7.milli as 'w) // require on a time attribute
   .groupBy('string, 'w)
   .select('string, 'int.count)
+
+val expected = unaryNode(
--- End diff --

Seems that I haven't seen the exception. Thanks.


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899780#comment-15899780
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104722597
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
 ---
@@ -146,42 +146,9 @@ class AggregationsITCase extends 
StreamingMultipleProgramsTestBase {
   "Hi,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
-
-  @Test
-  def testEventTimeSlidingWindow(): Unit = {
--- End diff --

Yes.


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899776#comment-15899776
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104721437
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.CombineFunction
+import org.apache.flink.types.Row
+
+/**
+  * Wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
+  * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
+  *
+  * It is used for sliding on batch for both time and count-windows.
+  *
+  * @param aggregates aggregate functions.
+  * @param groupKeysMapping index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  * @param aggregateMapping index mapping between aggregate function list 
and aggregated value
+  * index in output Row.
+  * @param intermediateRowArity intermediate row field count
+  * @param finalRowArity output row field count
+  * @param finalRowWindowStartPos relative window-start position to last 
field of output row
+  * @param finalRowWindowEndPos relative window-end position to last field 
of output row
+  * @param windowSize size of the window, used to determine window-end for 
output row
+  */
+class DataSetSlideWindowAggReduceCombineFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+groupKeysMapping: Array[(Int, Int)],
+aggregateMapping: Array[(Int, Int)],
+intermediateRowArity: Int,
+finalRowArity: Int,
+finalRowWindowStartPos: Option[Int],
+finalRowWindowEndPos: Option[Int],
+windowSize: Long)
+  extends DataSetSlideWindowAggReduceGroupFunction(
+aggregates,
+groupKeysMapping,
+aggregateMapping,
+intermediateRowArity,
+finalRowArity,
+finalRowWindowStartPos,
+finalRowWindowEndPos,
+windowSize)
+  with CombineFunction[Row, Row] {
+
+  override def combine(records: Iterable[Row]): Row = {
+// initiate intermediate aggregate value
+aggregates.foreach(_.initiate(aggregateBuffer))
+
+val iterator = records.iterator()
+while (iterator.hasNext) {
+  val record = iterator.next()
+  aggregates.foreach(_.merge(record, aggregateBuffer))
+
+  // check if this record is the last record
+  if (!iterator.hasNext) {
+// set group keys to aggregateBuffer
+for (i <- groupKeysMapping.indices) {
+  aggregateBuffer.setField(i, record.getField(i))
+}
+
+aggregateBuffer.setField(windowStartFieldPos, 
record.getField(windowStartFieldPos))
+
+return aggregateBuffer
+  }
+}
+
+// this code path should never be reached as we return before the loop 
finishes
+throw new IllegalArgumentException("Group is empty. This should never 
happen.")
--- End diff --

I cannot remove it. The compiler complains otherwise.


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899770#comment-15899770
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104720345
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+
+/**
+  * It is used for sliding windows on batch for time-windows. It takes a 
prepared input row,
+  * aligns the window start, and replicates or omits records for different 
panes of a sliding
+  * window. It is used for non-incremental aggregations.
+  *
+  * @param aggregates aggregate functions
+  * @param groupingKeysLength number of grouping keys
+  * @param windowSize window size of the sliding window
+  * @param windowSlide window slide of the sliding window
+  * @param returnType return type of this function
+  */
+class DataSetSlideTimeWindowAggFlatMapFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupingKeysLength: Int,
+private val timeFieldPos: Int,
+private val windowSize: Long,
+private val windowSlide: Long,
+@transient private val returnType: TypeInformation[Row])
+  extends RichFlatMapFunction[Row, Row]
+  with ResultTypeQueryable[Row] {
+
+  private var aggregateBuffer: Row = _
+  private var outWindowStartIndex: Int = _
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
--- End diff --

An empty `Row` can be serialized because all fields are null and it 
implements `Serializable`. However, once we have a non-serializable value in 
the row, it will fail. 
But I think you are right, lets keep the `Row` initialization in `open()`.


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899753#comment-15899753
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104717287
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideCountWindowAggReduceGroupFunction.scala
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+import java.sql.Timestamp
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+  * It is used for sliding windows on batch for count-windows. It takes a 
prepared input row,
+  * pre-aggregates (pre-tumbles) rows, aligns the window start, and 
replicates or omits records
+  * for different panes of a sliding window.
+  *
+  * @param aggregates aggregate functions
+  * @param groupingKeysLength number of grouping keys
+  * @param preTumblingSize number of records to be aggregated (tumbled) 
before emission
+  * @param windowSize window size of the sliding window
+  * @param windowSlide window slide of the sliding window
+  * @param returnType return type of this function
+  */
+class DataSetSlideCountWindowAggReduceGroupFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupingKeysLength: Int,
+private val preTumblingSize: Long,
+private val windowSize: Long,
+private val windowSlide: Long,
+@transient private val returnType: TypeInformation[Row])
+  extends RichGroupReduceFunction[Row, Row]
+  with ResultTypeQueryable[Row] {
+
+  private var output: Row = _
+  private var outWindowStartIndex: Int = _
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+// add one field to store window start count
+val partialRowLength = groupingKeysLength +
+  aggregates.map(_.intermediateDataType.length).sum + 1
+output = new Row(partialRowLength)
+outWindowStartIndex = partialRowLength - 1
+  }
+
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
+var count: Long = 0
+
+val iterator = records.iterator()
+
+while (iterator.hasNext) {
+  val record = iterator.next()
+  // reset aggregates after completed tumbling
+  if (count % preTumblingSize == 0) {
+// initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(output))
+  }
+
+  // merge intermediate aggregate value to buffer.
+  aggregates.foreach(_.merge(record, output))
+
+  count += 1
+
+  // trigger tumbling evaluation
+  if (count % preTumblingSize == 0) {
--- End diff --

I don't think so. It returns the same result than the DataStream variant.


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899731#comment-15899731
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104714888
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+
+/**
+  * It is used for sliding windows on batch for time-windows. It takes a 
prepared input row,
+  * aligns the window start, and replicates or omits records for different 
panes of a sliding
+  * window. It is used for non-incremental aggregations.
+  *
+  * @param aggregates aggregate functions
+  * @param groupingKeysLength number of grouping keys
+  * @param windowSize window size of the sliding window
+  * @param windowSlide window slide of the sliding window
+  * @param returnType return type of this function
+  */
+class DataSetSlideTimeWindowAggFlatMapFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupingKeysLength: Int,
+private val timeFieldPos: Int,
+private val windowSize: Long,
+private val windowSlide: Long,
+@transient private val returnType: TypeInformation[Row])
+  extends RichFlatMapFunction[Row, Row]
+  with ResultTypeQueryable[Row] {
+
+  private var aggregateBuffer: Row = _
+  private var outWindowStartIndex: Int = _
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+// add one field to store window start
+val partialRowLength = groupingKeysLength +
+  aggregates.map(_.intermediateDataType.length).sum + 1
+aggregateBuffer = new Row(partialRowLength)
+outWindowStartIndex = partialRowLength - 1
+  }
+
+  override def flatMap(record: Row, out: Collector[Row]): Unit = {
+val windowStart = record.getField(timeFieldPos).asInstanceOf[Long]
+
+// adopted from SlidingEventTimeWindows.assignWindows
+var start: Long = TimeWindow.getWindowStartWithOffset(windowStart, 0, 
windowSlide)
+
+// skip preparing output if it is not necessary
+if (start > windowStart - windowSize) {
+
+  // prepare output
+  for (i <- aggregates.indices) {
--- End diff --

You are right. Will simplify that.


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899734#comment-15899734
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104715450
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideCountWindowAggReduceGroupFunction.scala
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+import java.sql.Timestamp
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+  * It is used for sliding windows on batch for count-windows. It takes a 
prepared input row,
+  * pre-aggregates (pre-tumbles) rows, aligns the window start, and 
replicates or omits records
+  * for different panes of a sliding window.
+  *
+  * @param aggregates aggregate functions
+  * @param groupingKeysLength number of grouping keys
+  * @param preTumblingSize number of records to be aggregated (tumbled) 
before emission
+  * @param windowSize window size of the sliding window
+  * @param windowSlide window slide of the sliding window
+  * @param returnType return type of this function
+  */
+class DataSetSlideCountWindowAggReduceGroupFunction(
--- End diff --

I will create an issue for this.


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899717#comment-15899717
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104713604
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+
+/**
+  * It is used for sliding windows on batch for time-windows. It takes a 
prepared input row,
+  * aligns the window start, and replicates or omits records for different 
panes of a sliding
+  * window. It is used for non-incremental aggregations.
+  *
+  * @param aggregates aggregate functions
+  * @param groupingKeysLength number of grouping keys
+  * @param windowSize window size of the sliding window
+  * @param windowSlide window slide of the sliding window
+  * @param returnType return type of this function
+  */
+class DataSetSlideTimeWindowAggFlatMapFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupingKeysLength: Int,
+private val timeFieldPos: Int,
+private val windowSize: Long,
+private val windowSlide: Long,
+@transient private val returnType: TypeInformation[Row])
+  extends RichFlatMapFunction[Row, Row]
+  with ResultTypeQueryable[Row] {
+
+  private var aggregateBuffer: Row = _
+  private var outWindowStartIndex: Int = _
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
--- End diff --

Row is not always serializable and accumulators too. We are safer if we 
init them in `open()`.


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899671#comment-15899671
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104706852
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -153,6 +170,156 @@ object AggregateUtil {
   }
 
   /**
+* Create a 
[[org.apache.flink.api.common.functions.GroupReduceFunction]] that prepares for
+* incremental aggregates of sliding windows (time and count-windows).
+* It requires a prepared input (with intermediate aggregate fields and 
aligned rowtime for
+* pre-tumbling in case of time-windows), pre-aggregates (pre-tumbles) 
rows, aligns the
+* window-start, and replicates or omits records for different panes of 
a sliding window.
+*
+* The output of the function contains the grouping keys, the 
intermediate aggregate values of
+* all aggregate function and the aligned window start. Window start 
must not be a timestamp,
+* but can also be a count value for count-windows.
+*
+* The output is stored in Row by the following format:
+*
+* {{{
+*  avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+*|  |
+*v  v
+*
+-+-+++++-+
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 | 
windowStart |
+*
+-+-+++++-+
+*  ^ ^
+*  | |
+* sum(y) aggOffsetInRow = 4window 
start for pane mapping
+* }}}
+*
+* NOTE: this function is only used for sliding windows on batch tables.
+*/
+  def createDataSetSlideWindowPrepareGroupReduceFunction(
+  window: LogicalWindow,
+  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+  groupings: Array[Int],
+  inputType: RelDataType,
+  isParserCaseSensitive: Boolean)
+: RichGroupReduceFunction[Row, Row] = {
+
+val aggregates = transformToAggregateFunctions(
+  namedAggregates.map(_.getKey),
+  inputType,
+  groupings.length)._2
+
+val returnType: RowTypeInfo = createAggregateBufferDataType(
+  groupings,
+  aggregates,
+  inputType,
+  Some(Array(BasicTypeInfo.LONG_TYPE_INFO)))
+
+window match {
+  case EventTimeSlidingGroupWindow(_, _, size, slide) if 
isTimeInterval(size.resultType) =>
+// sliding time-window
+if (aggregates.forall(_.supportPartial)) {
+  // for incremental aggregations
+  new DataSetSlideTimeWindowAggReduceCombineFunction(
+aggregates,
+groupings.length,
+returnType.getArity - 1,
+asLong(size),
+asLong(slide),
+returnType)
+} else {
+  // for non-incremental aggregations
--- End diff --

You are right. I will remove this case distinction.


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898959#comment-15898959
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/3364
  
@fhueske thanks for your comments. I will create a time window PR.


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898181#comment-15898181
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104523761
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceCombineFunction.scala
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.CombineFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.types.Row
+
+/**
+  * Wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
+  * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
+  *
+  * It is used for sliding on batch for time-windows.
+  *
+  * @param aggregates aggregate functions
+  * @param groupingKeysLength number of grouping keys
+  * @param timeFieldPos position of aligned time field
+  * @param windowSize window size of the sliding window
+  * @param windowSlide window slide of the sliding window
+  * @param returnType return type of this function
+  */
+class DataSetSlideTimeWindowAggReduceCombineFunction(
+aggregates: Array[Aggregate[_]],
+groupingKeysLength: Int,
+timeFieldPos: Int,
+windowSize: Long,
+windowSlide: Long,
+returnType: TypeInformation[Row])
+  extends DataSetSlideTimeWindowAggReduceGroupFunction(
+aggregates,
+groupingKeysLength,
+timeFieldPos,
+windowSize,
+windowSlide,
+returnType)
+  with CombineFunction[Row, Row] {
+
+  override def combine(records: Iterable[Row]): Row = {
+// initiate intermediate aggregate value
+aggregates.foreach(_.initiate(aggregateBuffer))
+
+val iterator = records.iterator()
+while (iterator.hasNext) {
+  val record = iterator.next()
+
+  // merge intermediate aggregate value to buffer
+  aggregates.foreach(_.merge(record, aggregateBuffer))
+
+  // check if this record is the last record
+  if (!iterator.hasNext) {
+
+// set group keys value to buffer
+for (i <- 0 until groupingKeysLength) {
+  aggregateBuffer.setField(i, record.getField(i))
+}
+
+aggregateBuffer.setField(timeFieldPos, 
record.getField(timeFieldPos))
+
+return aggregateBuffer
+  }
+}
+
+// this code path should never be reached as we return before the loop 
finishes
+throw new IllegalArgumentException("Group is empty. This should never 
happen.")
--- End diff --

can be removed. It's the responsibility of the DataSet API to call the user 
functions correctly. And even if it would not, this function would behave 
correctly.


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898187#comment-15898187
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104524635
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+  * It wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
+  *
+  * It is used for sliding on batch for both time and count-windows.
+  *
+  * @param aggregates aggregate functions.
+  * @param groupKeysMapping index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  * @param aggregateMapping index mapping between aggregate function list 
and aggregated value
+  * index in output Row.
+  * @param intermediateRowArity intermediate row field count
+  * @param finalRowArity output row field count
+  * @param finalRowWindowStartPos relative window-start position to last 
field of output row
+  * @param finalRowWindowEndPos relative window-end position to last field 
of output row
+  * @param windowSize size of the window, used to determine window-end for 
output row
+  */
+class DataSetSlideWindowAggReduceGroupFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+groupKeysMapping: Array[(Int, Int)],
+aggregateMapping: Array[(Int, Int)],
+intermediateRowArity: Int,
+finalRowArity: Int,
+finalRowWindowStartPos: Option[Int],
+finalRowWindowEndPos: Option[Int],
+windowSize: Long)
+  extends RichGroupReduceFunction[Row, Row] {
+
+  protected var aggregateBuffer: Row = _
+  protected var windowStartFieldPos: Int = _
+
+  private var collector: TimeWindowPropertyCollector = _
+  private var output: Row = _
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
--- End diff --

Except for the initialization of `TimeWindowPropertyCollector` everything 
can be moved into the constructor. 


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898174#comment-15898174
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104521151
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideCountWindowAggReduceGroupFunction.scala
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+import java.sql.Timestamp
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+  * It is used for sliding windows on batch for count-windows. It takes a 
prepared input row,
+  * pre-aggregates (pre-tumbles) rows, aligns the window start, and 
replicates or omits records
+  * for different panes of a sliding window.
+  *
+  * @param aggregates aggregate functions
+  * @param groupingKeysLength number of grouping keys
+  * @param preTumblingSize number of records to be aggregated (tumbled) 
before emission
+  * @param windowSize window size of the sliding window
+  * @param windowSlide window slide of the sliding window
+  * @param returnType return type of this function
+  */
+class DataSetSlideCountWindowAggReduceGroupFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupingKeysLength: Int,
+private val preTumblingSize: Long,
+private val windowSize: Long,
+private val windowSlide: Long,
+@transient private val returnType: TypeInformation[Row])
+  extends RichGroupReduceFunction[Row, Row]
+  with ResultTypeQueryable[Row] {
+
+  private var output: Row = _
+  private var outWindowStartIndex: Int = _
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
--- End diff --

I think we can move everything into the constructor and remove the `open()` 
method.


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898176#comment-15898176
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104511932
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+
+/**
+  * It is used for sliding windows on batch for time-windows. It takes a 
prepared input row,
+  * aligns the window start, and replicates or omits records for different 
panes of a sliding
+  * window. It is used for non-incremental aggregations.
+  *
+  * @param aggregates aggregate functions
+  * @param groupingKeysLength number of grouping keys
+  * @param windowSize window size of the sliding window
+  * @param windowSlide window slide of the sliding window
+  * @param returnType return type of this function
+  */
+class DataSetSlideTimeWindowAggFlatMapFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupingKeysLength: Int,
+private val timeFieldPos: Int,
+private val windowSize: Long,
+private val windowSlide: Long,
+@transient private val returnType: TypeInformation[Row])
+  extends RichFlatMapFunction[Row, Row]
+  with ResultTypeQueryable[Row] {
+
+  private var aggregateBuffer: Row = _
+  private var outWindowStartIndex: Int = _
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+// add one field to store window start
+val partialRowLength = groupingKeysLength +
+  aggregates.map(_.intermediateDataType.length).sum + 1
+aggregateBuffer = new Row(partialRowLength)
+outWindowStartIndex = partialRowLength - 1
+  }
+
+  override def flatMap(record: Row, out: Collector[Row]): Unit = {
+val windowStart = record.getField(timeFieldPos).asInstanceOf[Long]
+
+// adopted from SlidingEventTimeWindows.assignWindows
+var start: Long = TimeWindow.getWindowStartWithOffset(windowStart, 0, 
windowSlide)
+
+// skip preparing output if it is not necessary
+if (start > windowStart - windowSize) {
+
+  // prepare output
+  for (i <- aggregates.indices) {
--- End diff --

Isn't this just copying data from record to the aggregateBuffer? 
Doesn't the input record have the same schema as the output record? Isn't 
is sufficient emit the input record multiple times with adapted 
`outWindowStartIndex`?


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898178#comment-15898178
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r10452
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
 ---
@@ -146,42 +146,9 @@ class AggregationsITCase extends 
StreamingMultipleProgramsTestBase {
   "Hi,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
-
-  @Test
-  def testEventTimeSlidingWindow(): Unit = {
--- End diff --

Is this test subsumed by `DataStreamAggregateITCase`?


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898177#comment-15898177
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104514985
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideCountWindowAggReduceGroupFunction.scala
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+import java.sql.Timestamp
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+  * It is used for sliding windows on batch for count-windows. It takes a 
prepared input row,
+  * pre-aggregates (pre-tumbles) rows, aligns the window start, and 
replicates or omits records
+  * for different panes of a sliding window.
+  *
+  * @param aggregates aggregate functions
+  * @param groupingKeysLength number of grouping keys
+  * @param preTumblingSize number of records to be aggregated (tumbled) 
before emission
+  * @param windowSize window size of the sliding window
+  * @param windowSlide window slide of the sliding window
+  * @param returnType return type of this function
+  */
+class DataSetSlideCountWindowAggReduceGroupFunction(
--- End diff --

I think it makes sense to add forward field annotations for the internal 
operators.


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898183#comment-15898183
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104527336
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
 ---
@@ -169,4 +168,228 @@ class DataSetWindowAggregateITCase(configMode: 
TableConfigMode)
   .toDataSet[Row]
   }
 
+  @Test(expected = classOf[UnsupportedOperationException])
+  def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 
'string)
+
+// Count sliding non-grouping window on event-time are currently not 
supported
+table
+  .window(Slide over 2.rows every 2.rows on 'long as 'w)
+  .groupBy('w)
+  .select('int.count)
+  .toDataSet[Row]
+  }
+
+  @Test
+  def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
+// please keep this test in sync with the DataStream variant
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 
'string)
+
+val windowedTable = table
+  .window(Slide over 5.milli every 2.milli on 'long as 'w)
+  .groupBy('w)
+  .select('int.count, 'w.start, 'w.end)
+
+val expected =
+  "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013\n" +
+  "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017\n" +
+  "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019\n" +
+  "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021\n" +
+  "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003\n" +
+  "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011\n" +
+  "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" +
+  "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
+  "4,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007"
+
+val results = windowedTable.toDataSet[Row].collect()
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverTimeOverlappingFullPane(): Unit = 
{
+// please keep this test in sync with the DataStream variant
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 
'string)
+
+val windowedTable = table
+  .window(Slide over 10.milli every 5.milli on 'long as 'w)
+  .groupBy('string, 'w)
+  .select('string, 'int.count, 'w.start, 'w.end)
+
+val expected =
+  "Hallo,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" +
+  "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" +
+  "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" +
+  "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015\n" +
+  "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02\n" +
+  "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025\n" +
+  "Hello,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015\n" +
+  "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" +
+  "Hello,3,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" +
+  "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" +
+  "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01"
+
+val results = windowedTable.toDataSet[Row].collect()
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverTimeOverlappingSplitPane(): Unit 
= {
+// please keep this test in sync with the DataStream variant
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 
'string)
+
+val windowedTable = table
+  .window(Slide over 5.milli every 4.milli on 'long as 'w)
+  .groupBy('string, 'w)
+  .select('string, 'int.count, 'w.start, 'w.end)
+
+val expected =
+  "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005\n" +
+  "Hello world,1,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" +
+  "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898180#comment-15898180
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104522343
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
 ---
@@ -280,6 +285,138 @@ class DataSetWindowAggregate(
 }
   }
 
+  private def createEventTimeSlidingWindowDataSet(
+  inputDS: DataSet[Row],
+  isTimeWindow: Boolean,
+  isParserCaseSensitive: Boolean)
+: DataSet[Row] = {
+
+// create MapFunction for initializing the aggregations
+// it aligns the rowtime for pre-tumbling in case of a time-window for 
incremental aggregates
+val mapFunction = createDataSetWindowPrepareMapFunction(
+  window,
+  namedAggregates,
+  grouping,
+  inputType,
+  isParserCaseSensitive)
+
+val mappedDataSet = inputDS
+  .map(mapFunction)
+  .name(prepareOperatorName)
+
+val mapReturnType = mappedDataSet.getType
+
+val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+val groupingKeys = grouping.indices.toArray
+
+// do incremental aggregation if possible
+val isIncremental = doAllSupportPartialAggregation(
+  namedAggregates.map(_.getKey),
+  inputType,
+  grouping.length)
+
+val preparedDataSet = if (isTimeWindow) {
+  // time window
+
+  if (isIncremental) {
+// incremental aggregates
+
+val groupingKeysAndAlignedRowtime = groupingKeys :+ 
mapReturnType.getArity - 1
+
+// create GroupReduceFunction
+// for pre-tumbling and replicating/omitting the content for each 
pane
+val prepareReduceFunction = 
createDataSetSlideWindowPrepareGroupReduceFunction(
+  window,
+  namedAggregates,
+  grouping,
+  inputType,
+  isParserCaseSensitive)
+
+mappedDataSet.asInstanceOf[DataSet[Row]]
+  .groupBy(groupingKeysAndAlignedRowtime: _*)
+  .reduceGroup(prepareReduceFunction) // pre-tumbles and 
replicates/omits
+  .name(prepareOperatorName)
+  } else {
+// non-incremental aggregates
+
+// create FlatMapFunction
+// for replicating/omitting the content for each pane
+val prepareFlatMapFunction = 
createDataSetSlideWindowPrepareFlatMapFunction(
+  window,
+  namedAggregates,
+  grouping,
+  inputType,
+  isParserCaseSensitive)
+
+mappedDataSet
+  .flatMap(prepareFlatMapFunction) // replicates/omits
+  }
+} else {
+  // count window
+
+  // grouped window
+  if (groupingKeys.length > 0) {
+
+if (isIncremental) {
+  // incremental aggregates
+
+  // create GroupReduceFunction
+  // for pre-tumbling and replicating/omitting the content for 
each pane
+  val prepareReduceFunction = 
createDataSetSlideWindowPrepareGroupReduceFunction(
+window,
+namedAggregates,
+grouping,
+inputType,
+isParserCaseSensitive)
+
+  mappedDataSet.asInstanceOf[DataSet[Row]]
+.groupBy(groupingKeys: _*)
+// sort on time field, it's the last element in the row
+.sortGroup(mapReturnType.getArity - 1, Order.ASCENDING)
+.reduceGroup(prepareReduceFunction) // pre-tumbles and 
replicates/omits
--- End diff --

Only do this if the tumble size is > 1?


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898186#comment-15898186
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104507420
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -153,6 +170,156 @@ object AggregateUtil {
   }
 
   /**
+* Create a 
[[org.apache.flink.api.common.functions.GroupReduceFunction]] that prepares for
+* incremental aggregates of sliding windows (time and count-windows).
+* It requires a prepared input (with intermediate aggregate fields and 
aligned rowtime for
+* pre-tumbling in case of time-windows), pre-aggregates (pre-tumbles) 
rows, aligns the
+* window-start, and replicates or omits records for different panes of 
a sliding window.
+*
+* The output of the function contains the grouping keys, the 
intermediate aggregate values of
+* all aggregate function and the aligned window start. Window start 
must not be a timestamp,
+* but can also be a count value for count-windows.
+*
+* The output is stored in Row by the following format:
+*
+* {{{
+*  avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+*|  |
+*v  v
+*
+-+-+++++-+
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 | 
windowStart |
+*
+-+-+++++-+
+*  ^ ^
+*  | |
+* sum(y) aggOffsetInRow = 4window 
start for pane mapping
+* }}}
+*
+* NOTE: this function is only used for sliding windows on batch tables.
+*/
+  def createDataSetSlideWindowPrepareGroupReduceFunction(
+  window: LogicalWindow,
+  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+  groupings: Array[Int],
+  inputType: RelDataType,
+  isParserCaseSensitive: Boolean)
+: RichGroupReduceFunction[Row, Row] = {
+
+val aggregates = transformToAggregateFunctions(
+  namedAggregates.map(_.getKey),
+  inputType,
+  groupings.length)._2
+
+val returnType: RowTypeInfo = createAggregateBufferDataType(
+  groupings,
+  aggregates,
+  inputType,
+  Some(Array(BasicTypeInfo.LONG_TYPE_INFO)))
+
+window match {
+  case EventTimeSlidingGroupWindow(_, _, size, slide) if 
isTimeInterval(size.resultType) =>
+// sliding time-window
+if (aggregates.forall(_.supportPartial)) {
+  // for incremental aggregations
+  new DataSetSlideTimeWindowAggReduceCombineFunction(
+aggregates,
+groupings.length,
+returnType.getArity - 1,
+asLong(size),
+asLong(slide),
+returnType)
+} else {
+  // for non-incremental aggregations
--- End diff --

Do we need this case? Without partial aggregation, we cannot use tumbling 
windows to compute sliding windows. So this method is only called if all 
aggregates support partial aggregation, right? 


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898185#comment-15898185
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104518276
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.CombineFunction
+import org.apache.flink.types.Row
+
+/**
+  * Wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
+  * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
+  *
+  * It is used for sliding on batch for both time and count-windows.
+  *
+  * @param aggregates aggregate functions.
+  * @param groupKeysMapping index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  * @param aggregateMapping index mapping between aggregate function list 
and aggregated value
+  * index in output Row.
+  * @param intermediateRowArity intermediate row field count
+  * @param finalRowArity output row field count
+  * @param finalRowWindowStartPos relative window-start position to last 
field of output row
+  * @param finalRowWindowEndPos relative window-end position to last field 
of output row
+  * @param windowSize size of the window, used to determine window-end for 
output row
+  */
+class DataSetSlideWindowAggReduceCombineFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+groupKeysMapping: Array[(Int, Int)],
+aggregateMapping: Array[(Int, Int)],
+intermediateRowArity: Int,
+finalRowArity: Int,
+finalRowWindowStartPos: Option[Int],
+finalRowWindowEndPos: Option[Int],
+windowSize: Long)
+  extends DataSetSlideWindowAggReduceGroupFunction(
+aggregates,
+groupKeysMapping,
+aggregateMapping,
+intermediateRowArity,
+finalRowArity,
+finalRowWindowStartPos,
+finalRowWindowEndPos,
+windowSize)
+  with CombineFunction[Row, Row] {
+
+  override def combine(records: Iterable[Row]): Row = {
+// initiate intermediate aggregate value
+aggregates.foreach(_.initiate(aggregateBuffer))
+
+val iterator = records.iterator()
+while (iterator.hasNext) {
+  val record = iterator.next()
+  aggregates.foreach(_.merge(record, aggregateBuffer))
+
+  // check if this record is the last record
+  if (!iterator.hasNext) {
+// set group keys to aggregateBuffer
+for (i <- groupKeysMapping.indices) {
+  aggregateBuffer.setField(i, record.getField(i))
+}
+
+aggregateBuffer.setField(windowStartFieldPos, 
record.getField(windowStartFieldPos))
+
+return aggregateBuffer
+  }
+}
+
+// this code path should never be reached as we return before the loop 
finishes
+throw new IllegalArgumentException("Group is empty. This should never 
happen.")
--- End diff --

Can be removed. Combine is only called if there is at least one record. 


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898184#comment-15898184
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104522529
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggFlatMapFunction.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+
+/**
+  * It is used for sliding windows on batch for time-windows. It takes a 
prepared input row,
+  * aligns the window start, and replicates or omits records for different 
panes of a sliding
+  * window. It is used for non-incremental aggregations.
+  *
+  * @param aggregates aggregate functions
+  * @param groupingKeysLength number of grouping keys
+  * @param windowSize window size of the sliding window
+  * @param windowSlide window slide of the sliding window
+  * @param returnType return type of this function
+  */
+class DataSetSlideTimeWindowAggFlatMapFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupingKeysLength: Int,
+private val timeFieldPos: Int,
+private val windowSize: Long,
+private val windowSlide: Long,
+@transient private val returnType: TypeInformation[Row])
+  extends RichFlatMapFunction[Row, Row]
+  with ResultTypeQueryable[Row] {
+
+  private var aggregateBuffer: Row = _
+  private var outWindowStartIndex: Int = _
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
--- End diff --

Move everything to the constructor and remove `open()`?


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898188#comment-15898188
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104521729
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideCountWindowAggReduceGroupFunction.scala
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+import java.sql.Timestamp
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+  * It is used for sliding windows on batch for count-windows. It takes a 
prepared input row,
+  * pre-aggregates (pre-tumbles) rows, aligns the window start, and 
replicates or omits records
+  * for different panes of a sliding window.
+  *
+  * @param aggregates aggregate functions
+  * @param groupingKeysLength number of grouping keys
+  * @param preTumblingSize number of records to be aggregated (tumbled) 
before emission
+  * @param windowSize window size of the sliding window
+  * @param windowSlide window slide of the sliding window
+  * @param returnType return type of this function
+  */
+class DataSetSlideCountWindowAggReduceGroupFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupingKeysLength: Int,
+private val preTumblingSize: Long,
+private val windowSize: Long,
+private val windowSlide: Long,
+@transient private val returnType: TypeInformation[Row])
+  extends RichGroupReduceFunction[Row, Row]
+  with ResultTypeQueryable[Row] {
+
+  private var output: Row = _
+  private var outWindowStartIndex: Int = _
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+// add one field to store window start count
+val partialRowLength = groupingKeysLength +
+  aggregates.map(_.intermediateDataType.length).sum + 1
+output = new Row(partialRowLength)
+outWindowStartIndex = partialRowLength - 1
+  }
+
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
+var count: Long = 0
+
+val iterator = records.iterator()
+
+while (iterator.hasNext) {
+  val record = iterator.next()
+  // reset aggregates after completed tumbling
+  if (count % preTumblingSize == 0) {
+// initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(output))
+  }
+
+  // merge intermediate aggregate value to buffer.
+  aggregates.foreach(_.merge(record, output))
+
+  count += 1
+
+  // trigger tumbling evaluation
+  if (count % preTumblingSize == 0) {
--- End diff --

do we also have to emit "incomplete" windows after the `while 
(iterator.hasNext)` loop?


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898175#comment-15898175
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104525345
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
 ---
@@ -360,6 +360,19 @@ class GroupWindowTest extends TableTestBase {
   .window(Session withGap 7.milli as 'w) // require on a time attribute
   .groupBy('string, 'w)
   .select('string, 'int.count)
+
+val expected = unaryNode(
--- End diff --

Why do you add the expected result to a test which is expected to fail?


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898182#comment-15898182
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104515406
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
 ---
@@ -280,6 +285,138 @@ class DataSetWindowAggregate(
 }
   }
 
+  private def createEventTimeSlidingWindowDataSet(
+  inputDS: DataSet[Row],
+  isTimeWindow: Boolean,
+  isParserCaseSensitive: Boolean)
+: DataSet[Row] = {
+
+// create MapFunction for initializing the aggregations
+// it aligns the rowtime for pre-tumbling in case of a time-window for 
incremental aggregates
+val mapFunction = createDataSetWindowPrepareMapFunction(
+  window,
+  namedAggregates,
+  grouping,
+  inputType,
+  isParserCaseSensitive)
+
+val mappedDataSet = inputDS
+  .map(mapFunction)
+  .name(prepareOperatorName)
+
+val mapReturnType = mappedDataSet.getType
+
+val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+val groupingKeys = grouping.indices.toArray
+
+// do incremental aggregation if possible
+val isIncremental = doAllSupportPartialAggregation(
+  namedAggregates.map(_.getKey),
+  inputType,
+  grouping.length)
+
+val preparedDataSet = if (isTimeWindow) {
+  // time window
+
+  if (isIncremental) {
+// incremental aggregates
+
+val groupingKeysAndAlignedRowtime = groupingKeys :+ 
mapReturnType.getArity - 1
+
+// create GroupReduceFunction
+// for pre-tumbling and replicating/omitting the content for each 
pane
+val prepareReduceFunction = 
createDataSetSlideWindowPrepareGroupReduceFunction(
+  window,
+  namedAggregates,
+  grouping,
+  inputType,
+  isParserCaseSensitive)
+
+mappedDataSet.asInstanceOf[DataSet[Row]]
+  .groupBy(groupingKeysAndAlignedRowtime: _*)
+  .reduceGroup(prepareReduceFunction) // pre-tumbles and 
replicates/omits
+  .name(prepareOperatorName)
+  } else {
+// non-incremental aggregates
+
+// create FlatMapFunction
+// for replicating/omitting the content for each pane
+val prepareFlatMapFunction = 
createDataSetSlideWindowPrepareFlatMapFunction(
+  window,
+  namedAggregates,
+  grouping,
+  inputType,
+  isParserCaseSensitive)
+
+mappedDataSet
+  .flatMap(prepareFlatMapFunction) // replicates/omits
+  }
+} else {
+  // count window
--- End diff --

Can we break this PR into time and count windows? There are so many cases 
to consider...


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15898179#comment-15898179
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3364#discussion_r104523932
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
 ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+  * It is used for sliding windows on batch for time-windows. It takes a 
prepared input row (with
+  * aligned rowtime for pre-tumbling), pre-aggregates (pre-tumbles) rows, 
aligns the window start,
+  * and replicates or omits records for different panes of a sliding 
window.
+  *
+  * This function is similar to 
[[DataSetTumbleCountWindowAggReduceGroupFunction]], however,
+  * it does no final aggregate evaluation. It also includes the logic of
+  * [[DataSetSlideTimeWindowAggFlatMapFunction]].
+  *
+  * @param aggregates aggregate functions
+  * @param groupingKeysLength number of grouping keys
+  * @param timeFieldPos position of aligned time field
+  * @param windowSize window size of the sliding window
+  * @param windowSlide window slide of the sliding window
+  * @param returnType return type of this function
+  */
+class DataSetSlideTimeWindowAggReduceGroupFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupingKeysLength: Int,
+private val timeFieldPos: Int,
+private val windowSize: Long,
+private val windowSlide: Long,
+@transient private val returnType: TypeInformation[Row])
+  extends RichGroupReduceFunction[Row, Row]
+  with ResultTypeQueryable[Row] {
+
+  protected var aggregateBuffer: Row = _
+  private var outWindowStartIndex: Int = _
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
--- End diff --

Move checks and initialization to constructor and remove `open()`?


> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be 

[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2017-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874713#comment-15874713
 ] 

ASF GitHub Bot commented on FLINK-5047:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/3364

[FLINK-5047] [table] Add sliding group-windows for batch tables

This PR implements sliding group-windows. It covers the following cases:

- Grouped Slide count-windows with incremental aggregates
- Grouped and All Slide time-windows with incremental and non-incremental 
aggregates

All windows support the overlapping and non-overlapping case. Slide windows 
are pre-tumbled if possible. This PR also fixes some bugs. 

If the general design is ok, I will also implement the missing 
non-incremental aggregates for count-windows. And add a bit of documentation.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-5047

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3364.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3364


commit adcedd91e76e73457740816f691ccf64f2e2e38b
Author: twalthr 
Date:   2017-01-18T15:56:02Z

[FLINK-5047] [table] Add sliding group-windows for batch tables




> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Timo Walther
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2016-11-10 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15653975#comment-15653975
 ] 

Jark Wu commented on FLINK-5047:


Make sense. I prefer the third approach too. 

> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2016-11-10 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15653968#comment-15653968
 ] 

Fabian Hueske commented on FLINK-5047:
--

Right, the third approach does only work for combinable aggregates and reduces 
the amount of replicated data because only pre-aggregates are replicated. I'd 
prefer it over approach 2 because it is easier to implement (it extends 
approach 1) than approach 2 which would require an implementation for SQL 
Window.

> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2016-11-10 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15653869#comment-15653869
 ] 

Jark Wu commented on FLINK-5047:


Hi [~fhueske] Agree. I think the first approach can be easily supported after 
FLINK-4692 resolved. 

Regarding to 
{quote}
it a) is based on the implementation that supports non-combinable aggregates 
(which is required in any case) 
{quote}

If I understand correctly, the third approach doesn't support non-combinable 
aggregates  such as median, right ?  It's only an optimization for 
pre-aggregation which is better than approach-2 , right?  

> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5047) Add sliding group-windows for batch tables

2016-11-10 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15653582#comment-15653582
 ] 

Fabian Hueske commented on FLINK-5047:
--

I think we should address this issue by first implementing the generic first 
approach. 
After that we can add an optimization for combinable aggregation functions (the 
third approach).

What do you think [~jark]?

> Add sliding group-windows for batch tables
> --
>
> Key: FLINK-5047
> URL: https://issues.apache.org/jira/browse/FLINK-5047
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Jark Wu
>
> Add Slide group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
> There are two ways to implement sliding windows for batch:
> 1. replicate the output in order to assign keys for overlapping windows. This 
> is probably the more straight-forward implementation and supports any 
> aggregation function but blows up the data volume.
> 2. if the aggregation functions are combinable / pre-aggregatable, we can 
> also find the largest tumbling window size from which the sliding windows can 
> be assembled. This is basically the technique used to express sliding windows 
> with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 
> minutes, 2 minutes) this would mean to first compute aggregates of 
> non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of 
> these into a sliding window (could be done in a MapPartition with sorted 
> input). The implementation could be done as an optimizer rule to split the 
> sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe 
> it makes sense to implement the WINDOW clause first and reuse this for 
> sliding windows.
> 3. There is also a third, hybrid solution: Doing the pre-aggregation on the 
> largest non-overlapping windows (as in 2) and replicating these results and 
> processing those as in the 1) approach. The benefits of this is that it a) is 
> based on the implementation that supports non-combinable aggregates (which is 
> required in any case) and b) that it does not require the implementation of 
> the SQL WINDOW operator. Internally, this can be implemented again as an 
> optimizer rule that translates the SlidingWindow into a pre-aggregating 
> TublingWindow and a final SlidingWindow (with replication).
> see FLINK-4692 for more discussion



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)