[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-15 Thread huawei-flink
Github user huawei-flink closed the pull request at:

https://github.com/apache/flink/pull/3443


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-09 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r105162342
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
 ---
@@ -46,6 +52,20 @@ class DataStreamCalcRule
   calc.getProgram,
   description)
   }
+
+  override def matches(call: RelOptRuleCall): Boolean = {
--- End diff --

very nice. This was a work-around, better to manage it propertly


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-09 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r105162261
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 ---
@@ -165,6 +165,11 @@ object FlinkRuleSets {
 
   // merge and push unions rules
   UnionEliminatorRule.INSTANCE,
+  
+  // aggregations over intervals should be enabled to be translated 
also in
+  //queries with LogicalWindows, not only queries with LogicalCalc
+  ProjectWindowTransposeRule.INSTANCE,
+  ProjectToWindowRule.INSTANCE,
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r105045766
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 ---
@@ -165,6 +165,11 @@ object FlinkRuleSets {
 
   // merge and push unions rules
   UnionEliminatorRule.INSTANCE,
+  
+  // aggregations over intervals should be enabled to be translated 
also in
+  //queries with LogicalWindows, not only queries with LogicalCalc
+  ProjectWindowTransposeRule.INSTANCE,
+  ProjectToWindowRule.INSTANCE,
--- End diff --

This rule has been added to the normalization phase. It can be removed here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r105046076
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
 ---
@@ -46,6 +52,20 @@ class DataStreamCalcRule
   calc.getProgram,
   description)
   }
+
+  override def matches(call: RelOptRuleCall): Boolean = {
--- End diff --

This can be removed because the `RexOver` was removed from `Project` (and 
Calc`) during normalization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-06 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r104361048
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/ProcTimeRowStreamAggregationSqlITCase.java
 ---
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.stream.sql;
+
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.types.Row;
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.table.api.java.stream.utils.StreamTestData;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ProcTimeRowStreamAggregationSqlITCase extends 
StreamingMultipleProgramsTestBase {
+
+   
+   @Test
+   public void testMaxAggregatation() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+   StreamITCase.clear();
+
+   env.setParallelism(1);
+   
+   DataStream> ds = 
StreamTestData.get5TupleDataStream(env);
+   Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e");
+   tableEnv.registerTable("MyTable", in);
+
+   String sqlQuery = "SELECT a, MAX(c) OVER (PARTITION BY a ORDER 
BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable";
+   Table result = tableEnv.sql(sqlQuery);
+
+   DataStream resultSet = tableEnv.toDataStream(result, 
Row.class);
+   resultSet.addSink(new StreamITCase.StringSink());
+   env.execute();
+
+   List expected = new ArrayList<>();
+   expected.add("1,0");
+   expected.add("2,1");
+   expected.add("2,2");
+   expected.add("3,3");
+   expected.add("3,4");
+   expected.add("3,5");
+   expected.add("4,6");
+   expected.add("4,7");
+   expected.add("4,8");
+   expected.add("4,9");
+   expected.add("5,10");
+   expected.add("5,11");
+   expected.add("5,12");
+   expected.add("5,14");
+   expected.add("5,14");
+
+   StreamITCase.compareWithList(expected);
+   }
+   
+   @Test
+   public void testMinAggregatation() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+   StreamITCase.clear();
+
+   env.setParallelism(1);
+   
+   DataStream> ds = 
StreamTestData.get5TupleDataStream(env);
+   Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e");
+   tableEnv.registerTable("MyTable", in);
+
+   String sqlQuery = "SELECT a, MIN(c) OVER (PARTITION BY a ORDER 
BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable";
+   Table result = tableEnv.sql(sqlQuery);
+
+   DataStream resultSet = tableEnv.toDataStream(result, 
Row.class);
+   resultSet.addSink(new StreamITCase.StringSink());
+   env.execute();
+
+   List expected = new ArrayList<>();
+   expected.add("1,0");
+

[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-03 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r104148648
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream.aggs;
+
+import static 
org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO;
+
+import org.apache.flink.api.java.summarize.aggregation.Aggregator;
+import org.apache.flink.api.java.summarize.aggregation.CompensatedSum;
+import 
org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator;
+
+public class DoubleSummaryAggregation extends 
NumericSummaryAggregator {
--- End diff --

That is fine. I have pulled the new aggregation functions, and updated the 
PR without the reset method. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-02 Thread shaoxuan-wang
Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r104096189
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream.aggs;
+
+import static 
org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO;
+
+import org.apache.flink.api.java.summarize.aggregation.Aggregator;
+import org.apache.flink.api.java.summarize.aggregation.CompensatedSum;
+import 
org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator;
+
+public class DoubleSummaryAggregation extends 
NumericSummaryAggregator {
--- End diff --

@huawei-flink ,   I have replied your question in the UDAGG design doc. 
AggregateFunction is the base class for UDAGG. We are very cautious to add any 
new method into this interface. As mentioned in the UDAGG design doc, only 
createAccumulator, getValue, accumulate are the must to have methods for an 
aggregate. Merge methods is optional only useful for advanced optimization for 
the runtime execution plan. Retract may also be a must-have if the users are 
care about the correctness. I do not see why reset is necessary for aggregate. 
If it is helpful in your case, you can always add this method in your User(you 
as the user) Defined Aggregate Function. UDAGG is still on the way, but I think 
it should be available very soon. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103926009
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream.aggs;
+
+import static 
org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO;
+
+import org.apache.flink.api.java.summarize.aggregation.Aggregator;
+import org.apache.flink.api.java.summarize.aggregation.CompensatedSum;
+import 
org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator;
+
+public class DoubleSummaryAggregation extends 
NumericSummaryAggregator {
--- End diff --

There was a discussion on the dev ML about the new aggregation interface 
(https://lists.apache.org/thread.html/c4e8bbcb32731e5b37ec0a561effb279034199b495fae40b1d708406@%3Cdev.flink.apache.org%3E).
 Would be great if you could comment there if you think something needs to be 
changed. Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-02 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103925068
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream.aggs;
+
+import static 
org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO;
+
+import org.apache.flink.api.java.summarize.aggregation.Aggregator;
+import org.apache.flink.api.java.summarize.aggregation.CompensatedSum;
+import 
org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator;
+
+public class DoubleSummaryAggregation extends 
NumericSummaryAggregator {
--- End diff --

Indeed, the StreamAggregator interface I defined has a reset and evict 
method, to support associative function aggregation. Anyway, no point in 
discussing it. I will push my proposal and then you see :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103924165
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream.aggs;
+
+import static 
org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO;
+
+import org.apache.flink.api.java.summarize.aggregation.Aggregator;
+import org.apache.flink.api.java.summarize.aggregation.CompensatedSum;
+import 
org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator;
+
+public class DoubleSummaryAggregation extends 
NumericSummaryAggregator {
--- End diff --

The interface will be extended with an retract method to "subtract" 
records. @shaoxuan-wang can comment on that. he is working on the UDAGGs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-02 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103920777
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream.aggs;
+
+import static 
org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO;
+
+import org.apache.flink.api.java.summarize.aggregation.Aggregator;
+import org.apache.flink.api.java.summarize.aggregation.CompensatedSum;
+import 
org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator;
+
+public class DoubleSummaryAggregation extends 
NumericSummaryAggregator {
--- End diff --

I see that the AggregateFunction interface does not have a reset method if 
not the createAccumulator(), this means that we have to create an accumulator 
for every apply execution. With the reset you just assign the value to the 
starting point, without creating a new object all time. What do you think?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-02 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103919690
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream.aggs;
+
+import static 
org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO;
+
+import org.apache.flink.api.java.summarize.aggregation.Aggregator;
+import org.apache.flink.api.java.summarize.aggregation.CompensatedSum;
+import 
org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator;
+
+public class DoubleSummaryAggregation extends 
NumericSummaryAggregator {
--- End diff --

ok, so I will change my code to use the AggregateFunction you pointed out


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103918476
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream.aggs;
+
+import static 
org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO;
+
+import org.apache.flink.api.java.summarize.aggregation.Aggregator;
+import org.apache.flink.api.java.summarize.aggregation.CompensatedSum;
+import 
org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator;
+
+public class DoubleSummaryAggregation extends 
NumericSummaryAggregator {
--- End diff --

No, I think we should use the new aggregation functions for the PR. Do you 
think that's possible?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-02 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103914685
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream.aggs;
+
+import static 
org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO;
+
+import org.apache.flink.api.java.summarize.aggregation.Aggregator;
+import org.apache.flink.api.java.summarize.aggregation.CompensatedSum;
+import 
org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator;
+
+public class DoubleSummaryAggregation extends 
NumericSummaryAggregator {
--- End diff --

Do you think this switch on aggregation could be done in another PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103909104
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream.aggs;
+
+import static 
org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO;
+
+import org.apache.flink.api.java.summarize.aggregation.Aggregator;
+import org.apache.flink.api.java.summarize.aggregation.CompensatedSum;
+import 
org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator;
+
+public class DoubleSummaryAggregation extends 
NumericSummaryAggregator {
--- End diff --

Actually, we are replacing the current aggregation functions by a new 
interface that also allows to define user-defined aggregate functions. The 
functions are already merged 
(https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala),
 but not used. We'll probably merge a change to activate and use them later 
today. This interface should be used because otherwise user-defined aggregation 
function won't be supported.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-02 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103887096
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream.aggs;
+
+import static 
org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO;
+
+import org.apache.flink.api.java.summarize.aggregation.Aggregator;
+import org.apache.flink.api.java.summarize.aggregation.CompensatedSum;
+import 
org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator;
+
+public class DoubleSummaryAggregation extends 
NumericSummaryAggregator {
--- End diff --

The aggregate functions in the org.apache.flink.table.runtime.aggregate.* 
assume a GroupReduce, whereas I have implemented it as a flatmap. Should I 
switch my implementation to reduce? @fhueske what do you think? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-01 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103866134
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream.aggs;
+
+import static 
org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO;
+
+import org.apache.flink.api.java.summarize.aggregation.Aggregator;
+import org.apache.flink.api.java.summarize.aggregation.CompensatedSum;
+import 
org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator;
+
+public class DoubleSummaryAggregation extends 
NumericSummaryAggregator {
--- End diff --

Good point. I searched a while, but could not find something fit, then I 
decided to create stream specific. The idea was to have something that could be 
stream optimized eventually. However, I will try to reuse existing one. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-01 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103866164
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/ProcTimeRowStreamAggregationSqlITCase.java
 ---
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.stream.sql;
+
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.types.Row;
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.table.api.java.stream.utils.StreamTestData;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ProcTimeRowStreamAggregationSqlITCase extends 
StreamingMultipleProgramsTestBase {
+
+   
+   @Test
+   public void testMaxAggregatation() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+   StreamITCase.clear();
+
+   env.setParallelism(1);
+   
+   DataStream> ds = 
StreamTestData.get5TupleDataStream(env);
+   Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e");
+   tableEnv.registerTable("MyTable", in);
+
+   String sqlQuery = "SELECT a, MAX(c) OVER (PARTITION BY a ORDER 
BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable";
+   Table result = tableEnv.sql(sqlQuery);
+
+   DataStream resultSet = tableEnv.toDataStream(result, 
Row.class);
+   resultSet.addSink(new StreamITCase.StringSink());
+   env.execute();
+
+   List expected = new ArrayList<>();
+   expected.add("1,0");
+   expected.add("2,1");
+   expected.add("2,2");
+   expected.add("3,3");
+   expected.add("3,4");
+   expected.add("3,5");
+   expected.add("4,6");
+   expected.add("4,7");
+   expected.add("4,8");
+   expected.add("4,9");
+   expected.add("5,10");
+   expected.add("5,11");
+   expected.add("5,12");
+   expected.add("5,14");
+   expected.add("5,14");
+
+   StreamITCase.compareWithList(expected);
+   }
+   
+   @Test
+   public void testMinAggregatation() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+   StreamITCase.clear();
+
+   env.setParallelism(1);
+   
+   DataStream> ds = 
StreamTestData.get5TupleDataStream(env);
+   Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e");
+   tableEnv.registerTable("MyTable", in);
+
+   String sqlQuery = "SELECT a, MIN(c) OVER (PARTITION BY a ORDER 
BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable";
+   Table result = tableEnv.sql(sqlQuery);
+
+   DataStream resultSet = tableEnv.toDataStream(result, 
Row.class);
+   resultSet.addSink(new StreamITCase.StringSink());
+   env.execute();
+
+   List expected = new ArrayList<>();
+   expected.add("1,0");
+

[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-01 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103865652
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/logical/rel/util/WindowAggregateUtil.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.logical.rel.util;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.calcite.rel.core.Window.Group;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rex.RexLiteral;
+
+import com.google.common.collect.ImmutableList;
+
+public class WindowAggregateUtil implements Serializable {
+
+   private static final long serialVersionUID = -3916551736243544540L;
+
+   private LogicalWindow windowPointer = null;
+
+   public WindowAggregateUtil() {
+
+   }
+
+   public WindowAggregateUtil(LogicalWindow window) {
+   this.windowPointer = window;
+
+   }
+
+   /**
+* A utility function that checks whether a window is partitioned or it 
is a
+* global window.
+* 
+* @param LogicalWindow
+*window to be checked for partitions
+* @return true if partition keys are defined, false otherwise.
+*/
+   public boolean isStreamPartitioned(LogicalWindow window) {
+   // if it exists a group bounded by keys, the it is
+   // a partitioned window
+   for (Group group : window.groups) {
+   if (!group.keys.isEmpty()) {
+   return true;
+   }
+   }
+
+   return false;
+   }
+
+   public int[] getKeysAsArray(Group group) {
+   if (group == null) {
+   return null;
+   }
+   return group.keys.toArray();
+   }
+
+   /**
+* This method returns the [[int]] lowerbound of a window when expressed
+* with an integer e.g. ... ROWS BETWEEN [[value]] PRECEDING AND 
CURRENT ROW
+* 
+* @param constants
+*the list of constant to get the offset value
+* @return return the value of the lowerbound if available -1 otherwise
+*/
+
+   public int getLowerBoundary(ImmutableList constants) {
+   return ((Long)constants.get(0).getValue2()).intValue();
--- End diff --

Thank you very much. I was indeed puzzled by this. I will fix it according 
to your suggestion. I even tried to get to the Calcite mailing list, but 
nothing like this came out. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-01 Thread hongyuhong
Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103835080
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/DataStreamProcTimeRowAggregate.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Window.Group;
+import org.apache.calcite.rel.core.Window.RexWinAggCall;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.table.api.StreamTableEnvironment;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.plan.logical.rel.util.WindowAggregateUtil;
+import 
org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateRowGlobalWindowFunction;
+import 
org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateRowKeyedWindowFunction;
+import org.apache.flink.types.Row;
+
+public class DataStreamProcTimeRowAggregate extends DataStreamRelJava {
+
+   protected LogicalWindow windowRef;
+   protected String description;
+   protected WindowAggregateUtil winUtil;
+
+   public DataStreamProcTimeRowAggregate(RelOptCluster cluster, 
RelTraitSet traitSet, RelNode input,
+   RelDataType rowType, String description, LogicalWindow 
window) {
+   super(cluster, traitSet, input);
+   this.windowRef = window;
+   this.rowType = rowType;
+   this.description = description;
+   this.winUtil = new WindowAggregateUtil();
+   }
+
+   @Override
+   protected RelDataType deriveRowType() {
+   return super.deriveRowType();
+   }
+
+   @Override
+   public RelNode copy(RelTraitSet traitSet, java.util.List 
inputs) {
+
+   if (inputs.size() != 1) {
+   System.err.println(this.getClass().getName() + " : 
Input size must be one!");
+   }
+
+   return new DataStreamProcTimeRowAggregate(getCluster(), 
traitSet, inputs.get(0), getRowType(), getDescription(),
+   windowRef);
+   }
+
+   @Override
+   public DataStream translateToPlan(StreamTableEnvironment tableEnv, 
Row ignore) {
+
+   TableConfig config = tableEnv.getConfig();
+
+   DataStream inputDS = ((DataStreamRel) 
getInput()).translateToPlan(tableEnv);
+   
+   System.out.println(inputDS);
--- End diff --

That should be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-01 Thread hongyuhong
Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103835059
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/logical/rel/util/WindowAggregateUtil.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.logical.rel.util;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.calcite.rel.core.Window.Group;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rex.RexLiteral;
+
+import com.google.common.collect.ImmutableList;
+
+public class WindowAggregateUtil implements Serializable {
+
+   private static final long serialVersionUID = -3916551736243544540L;
+
+   private LogicalWindow windowPointer = null;
+
+   public WindowAggregateUtil() {
+
+   }
+
+   public WindowAggregateUtil(LogicalWindow window) {
+   this.windowPointer = window;
+
+   }
+
+   /**
+* A utility function that checks whether a window is partitioned or it 
is a
+* global window.
+* 
+* @param LogicalWindow
+*window to be checked for partitions
+* @return true if partition keys are defined, false otherwise.
+*/
+   public boolean isStreamPartitioned(LogicalWindow window) {
+   // if it exists a group bounded by keys, the it is
+   // a partitioned window
+   for (Group group : window.groups) {
+   if (!group.keys.isEmpty()) {
+   return true;
+   }
+   }
+
+   return false;
+   }
+
+   public int[] getKeysAsArray(Group group) {
+   if (group == null) {
+   return null;
+   }
+   return group.keys.toArray();
+   }
+
+   /**
+* This method returns the [[int]] lowerbound of a window when expressed
+* with an integer e.g. ... ROWS BETWEEN [[value]] PRECEDING AND 
CURRENT ROW
+* 
+* @param constants
+*the list of constant to get the offset value
+* @return return the value of the lowerbound if available -1 otherwise
+*/
+
+   public int getLowerBoundary(ImmutableList constants) {
+   return ((Long)constants.get(0).getValue2()).intValue();
--- End diff --

I think maybe we can not get the boundary just constants[0], such as 
```
select sum(2) over (order by rowtime() range interval '1' hour preceding) 
as u3 FROM  OrderA
```
the boundary will be constants[1],
you can use 
constants.get(lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex - 
inputRowType.getFieldCount)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-01 Thread hongyuhong
Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103835088
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream.aggs;
+
+import static 
org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO;
+
+import org.apache.flink.api.java.summarize.aggregation.Aggregator;
+import org.apache.flink.api.java.summarize.aggregation.CompensatedSum;
+import 
org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator;
+
+public class DoubleSummaryAggregation extends 
NumericSummaryAggregator {
--- End diff --

There have been a series of classes in scala do the aggregate, 
org.apache.flink.table.runtime.aggregate.*, i wonder if we should create 
another one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-01 Thread hongyuhong
Github user hongyuhong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103835113
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/ProcTimeRowStreamAggregationSqlITCase.java
 ---
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.stream.sql;
+
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.types.Row;
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.table.api.java.stream.utils.StreamTestData;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ProcTimeRowStreamAggregationSqlITCase extends 
StreamingMultipleProgramsTestBase {
+
+   
+   @Test
+   public void testMaxAggregatation() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+   StreamITCase.clear();
+
+   env.setParallelism(1);
+   
+   DataStream> ds = 
StreamTestData.get5TupleDataStream(env);
+   Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e");
+   tableEnv.registerTable("MyTable", in);
+
+   String sqlQuery = "SELECT a, MAX(c) OVER (PARTITION BY a ORDER 
BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable";
+   Table result = tableEnv.sql(sqlQuery);
+
+   DataStream resultSet = tableEnv.toDataStream(result, 
Row.class);
+   resultSet.addSink(new StreamITCase.StringSink());
+   env.execute();
+
+   List expected = new ArrayList<>();
+   expected.add("1,0");
+   expected.add("2,1");
+   expected.add("2,2");
+   expected.add("3,3");
+   expected.add("3,4");
+   expected.add("3,5");
+   expected.add("4,6");
+   expected.add("4,7");
+   expected.add("4,8");
+   expected.add("4,9");
+   expected.add("5,10");
+   expected.add("5,11");
+   expected.add("5,12");
+   expected.add("5,14");
+   expected.add("5,14");
+
+   StreamITCase.compareWithList(expected);
+   }
+   
+   @Test
+   public void testMinAggregatation() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+   StreamITCase.clear();
+
+   env.setParallelism(1);
+   
+   DataStream> ds = 
StreamTestData.get5TupleDataStream(env);
+   Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e");
+   tableEnv.registerTable("MyTable", in);
+
+   String sqlQuery = "SELECT a, MIN(c) OVER (PARTITION BY a ORDER 
BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable";
+   Table result = tableEnv.sql(sqlQuery);
+
+   DataStream resultSet = tableEnv.toDataStream(result, 
Row.class);
+   resultSet.addSink(new StreamITCase.StringSink());
+   env.execute();
+
+   List expected = new ArrayList<>();
+   expected.add("1,0");
+  

[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-01 Thread huawei-flink
GitHub user huawei-flink opened a pull request:

https://github.com/apache/flink/pull/3443

[FLINK-5653] Add processing time OVER ROWS BETWEEN x PRECEDING aggregation 
to SQL

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huawei-flink/flink FLINK-5653

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3443.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3443


commit 72ec35a7380a4d73bd092ce14962ab2248139bae
Author: Stefano Bortoli 
Date:   2017-02-01T16:15:58Z

First implementation of ProcTime()

commit e98c28616af1cf67d3ad3277d9cc2ca335604eca
Author: rtudoran 
Date:   2017-02-02T10:30:40Z

Disambiguate for the OVER BY clause, which should not be treated as a
RexOver expression in Logical Project

commit b7e6a673b88b6181c06071cd6c7bda55c25a62b4
Author: Stefano Bortoli 
Date:   2017-02-02T12:07:11Z

Added return to disambiguation method for rexover

commit cda17565d5969f29b16923b631178a2cbf64791b
Author: rtudoran 
Date:   2017-02-02T16:00:20Z

Enable the LogicalWindow operators in query translation

commit 4b3e54281018b83c818f91e09a5321c34bbf297b
Author: rtudoran 
Date:   2017-02-03T14:59:39Z

Added a DataStreamRel version that can be extended in java

commit cc960d699db369cc8dc4e155cc5c5f6c3baf74a4
Author: rtudoran 
Date:   2017-02-03T15:35:18Z

Add skeleton for the implementation of the aggregates over sliding
window with processing time and time boundaries

commit 2390a9d3dc15afba01185c47f61a9ea830ea5acc
Author: Stefano Bortoli 
Date:   2017-02-06T10:33:57Z

committing changes with stub modifications before chekout proctime
branch

commit eaf4e92784dab01b17004390968ca4b1fe7c4bea
Author: Stefano Bortoli 
Date:   2017-02-06T13:17:43Z

ignore aggregation test and implemented simple proctime test

commit 16ccd7f5bf019803ea8b53f09a126ec53a5a6d59
Author: Stefano Bortoli 
Date:   2017-02-06T14:17:03Z

Merge branch 'FLINK-5710' of https://github.com/huawei-flink/flink into 
FLINK-5653

commit 10f7bc5e2086e41ec76cbabdcd069c71a491671a
Author: Stefano Bortoli 
Date:   2017-02-07T09:42:41Z

committing first key selector and utils

commit 31060e46f78729880c03e8cab0f92ff06faec4f0
Author: Stefano Bortoli 
Date:   2017-02-07T11:16:43Z

Changed ProcTime from time to timestamp

commit 69289bad836a5fdace271b28a15ca0e309e50b17
Author: rtudoran 
Date:   2017-02-07T13:13:23Z

Merge branch 'FLINK-5710' of https://github.com/huawei-flink/flink into 
FLINK-5654

commit 3392817045ed166df5f55d22fde34cbd98c775db
Author: rtudoran 
Date:   2017-02-07T13:14:50Z

Merge branch 'FLINK-5653' of https://github.com/huawei-flink/flink into 
FLINK-5654

commit d2ea0076b5e3561585c4eaea84025e50beaacf9a
Author: Stefano Bortoli 
Date:   2017-02-07T09:42:41Z

fixing linelength and other issues

commit f29f564bb7fe7496b9f3d2f45a6b4469af559378
Author: Stefano Bortoli 
Date:   2017-02-07T13:46:30Z

Merge branch 'FLINK-5653' of https://github.com/huawei-flink/flink.git
into FLINK-5653

Conflicts:

flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/logical/rel/DataStreamWindowRowAggregate.java

flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/logical/rel/util/StreamGroupKeySelector.java

commit ea145ecefc2be1bea71e995dbf39585e7fa44012
Author: rtudoran