[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink closed the pull request at: https://github.com/apache/flink/pull/3443 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r105162342 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala --- @@ -46,6 +52,20 @@ class DataStreamCalcRule calc.getProgram, description) } + + override def matches(call: RelOptRuleCall): Boolean = { --- End diff -- very nice. This was a work-around, better to manage it propertly --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r105162261 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala --- @@ -165,6 +165,11 @@ object FlinkRuleSets { // merge and push unions rules UnionEliminatorRule.INSTANCE, + + // aggregations over intervals should be enabled to be translated also in + //queries with LogicalWindows, not only queries with LogicalCalc + ProjectWindowTransposeRule.INSTANCE, + ProjectToWindowRule.INSTANCE, --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r105045766 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala --- @@ -165,6 +165,11 @@ object FlinkRuleSets { // merge and push unions rules UnionEliminatorRule.INSTANCE, + + // aggregations over intervals should be enabled to be translated also in + //queries with LogicalWindows, not only queries with LogicalCalc + ProjectWindowTransposeRule.INSTANCE, + ProjectToWindowRule.INSTANCE, --- End diff -- This rule has been added to the normalization phase. It can be removed here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r105046076 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala --- @@ -46,6 +52,20 @@ class DataStreamCalcRule calc.getProgram, description) } + + override def matches(call: RelOptRuleCall): Boolean = { --- End diff -- This can be removed because the `RexOver` was removed from `Project` (and Calc`) during normalization. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r104361048 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/ProcTimeRowStreamAggregationSqlITCase.java --- @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.java.stream.sql; + +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.types.Row; +import org.apache.flink.table.api.scala.stream.utils.StreamITCase; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.table.api.java.stream.utils.StreamTestData; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class ProcTimeRowStreamAggregationSqlITCase extends StreamingMultipleProgramsTestBase { + + + @Test + public void testMaxAggregatation() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + StreamITCase.clear(); + + env.setParallelism(1); + + DataStream> ds = StreamTestData.get5TupleDataStream(env); + Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e"); + tableEnv.registerTable("MyTable", in); + + String sqlQuery = "SELECT a, MAX(c) OVER (PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable"; + Table result = tableEnv.sql(sqlQuery); + + DataStream resultSet = tableEnv.toDataStream(result, Row.class); + resultSet.addSink(new StreamITCase.StringSink()); + env.execute(); + + List expected = new ArrayList<>(); + expected.add("1,0"); + expected.add("2,1"); + expected.add("2,2"); + expected.add("3,3"); + expected.add("3,4"); + expected.add("3,5"); + expected.add("4,6"); + expected.add("4,7"); + expected.add("4,8"); + expected.add("4,9"); + expected.add("5,10"); + expected.add("5,11"); + expected.add("5,12"); + expected.add("5,14"); + expected.add("5,14"); + + StreamITCase.compareWithList(expected); + } + + @Test + public void testMinAggregatation() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + StreamITCase.clear(); + + env.setParallelism(1); + + DataStream > ds = StreamTestData.get5TupleDataStream(env); + Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e"); + tableEnv.registerTable("MyTable", in); + + String sqlQuery = "SELECT a, MIN(c) OVER (PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable"; + Table result = tableEnv.sql(sqlQuery); + + DataStream resultSet = tableEnv.toDataStream(result, Row.class); + resultSet.addSink(new StreamITCase.StringSink()); + env.execute(); + + List expected = new ArrayList<>(); + expected.add("1,0"); +
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r104148648 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream.aggs; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +import org.apache.flink.api.java.summarize.aggregation.Aggregator; +import org.apache.flink.api.java.summarize.aggregation.CompensatedSum; +import org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator; + +public class DoubleSummaryAggregation extends NumericSummaryAggregator { --- End diff -- That is fine. I have pulled the new aggregation functions, and updated the PR without the reset method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r104096189 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream.aggs; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +import org.apache.flink.api.java.summarize.aggregation.Aggregator; +import org.apache.flink.api.java.summarize.aggregation.CompensatedSum; +import org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator; + +public class DoubleSummaryAggregation extends NumericSummaryAggregator { --- End diff -- @huawei-flink , I have replied your question in the UDAGG design doc. AggregateFunction is the base class for UDAGG. We are very cautious to add any new method into this interface. As mentioned in the UDAGG design doc, only createAccumulator, getValue, accumulate are the must to have methods for an aggregate. Merge methods is optional only useful for advanced optimization for the runtime execution plan. Retract may also be a must-have if the users are care about the correctness. I do not see why reset is necessary for aggregate. If it is helpful in your case, you can always add this method in your User(you as the user) Defined Aggregate Function. UDAGG is still on the way, but I think it should be available very soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103926009 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream.aggs; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +import org.apache.flink.api.java.summarize.aggregation.Aggregator; +import org.apache.flink.api.java.summarize.aggregation.CompensatedSum; +import org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator; + +public class DoubleSummaryAggregation extends NumericSummaryAggregator { --- End diff -- There was a discussion on the dev ML about the new aggregation interface (https://lists.apache.org/thread.html/c4e8bbcb32731e5b37ec0a561effb279034199b495fae40b1d708406@%3Cdev.flink.apache.org%3E). Would be great if you could comment there if you think something needs to be changed. Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103925068 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream.aggs; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +import org.apache.flink.api.java.summarize.aggregation.Aggregator; +import org.apache.flink.api.java.summarize.aggregation.CompensatedSum; +import org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator; + +public class DoubleSummaryAggregation extends NumericSummaryAggregator { --- End diff -- Indeed, the StreamAggregator interface I defined has a reset and evict method, to support associative function aggregation. Anyway, no point in discussing it. I will push my proposal and then you see :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103924165 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream.aggs; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +import org.apache.flink.api.java.summarize.aggregation.Aggregator; +import org.apache.flink.api.java.summarize.aggregation.CompensatedSum; +import org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator; + +public class DoubleSummaryAggregation extends NumericSummaryAggregator { --- End diff -- The interface will be extended with an retract method to "subtract" records. @shaoxuan-wang can comment on that. he is working on the UDAGGs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103920777 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream.aggs; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +import org.apache.flink.api.java.summarize.aggregation.Aggregator; +import org.apache.flink.api.java.summarize.aggregation.CompensatedSum; +import org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator; + +public class DoubleSummaryAggregation extends NumericSummaryAggregator { --- End diff -- I see that the AggregateFunction interface does not have a reset method if not the createAccumulator(), this means that we have to create an accumulator for every apply execution. With the reset you just assign the value to the starting point, without creating a new object all time. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103919690 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream.aggs; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +import org.apache.flink.api.java.summarize.aggregation.Aggregator; +import org.apache.flink.api.java.summarize.aggregation.CompensatedSum; +import org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator; + +public class DoubleSummaryAggregation extends NumericSummaryAggregator { --- End diff -- ok, so I will change my code to use the AggregateFunction you pointed out --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103918476 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream.aggs; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +import org.apache.flink.api.java.summarize.aggregation.Aggregator; +import org.apache.flink.api.java.summarize.aggregation.CompensatedSum; +import org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator; + +public class DoubleSummaryAggregation extends NumericSummaryAggregator { --- End diff -- No, I think we should use the new aggregation functions for the PR. Do you think that's possible? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103914685 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream.aggs; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +import org.apache.flink.api.java.summarize.aggregation.Aggregator; +import org.apache.flink.api.java.summarize.aggregation.CompensatedSum; +import org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator; + +public class DoubleSummaryAggregation extends NumericSummaryAggregator { --- End diff -- Do you think this switch on aggregation could be done in another PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103909104 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream.aggs; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +import org.apache.flink.api.java.summarize.aggregation.Aggregator; +import org.apache.flink.api.java.summarize.aggregation.CompensatedSum; +import org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator; + +public class DoubleSummaryAggregation extends NumericSummaryAggregator { --- End diff -- Actually, we are replacing the current aggregation functions by a new interface that also allows to define user-defined aggregate functions. The functions are already merged (https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala), but not used. We'll probably merge a change to activate and use them later today. This interface should be used because otherwise user-defined aggregation function won't be supported. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103887096 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream.aggs; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +import org.apache.flink.api.java.summarize.aggregation.Aggregator; +import org.apache.flink.api.java.summarize.aggregation.CompensatedSum; +import org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator; + +public class DoubleSummaryAggregation extends NumericSummaryAggregator { --- End diff -- The aggregate functions in the org.apache.flink.table.runtime.aggregate.* assume a GroupReduce, whereas I have implemented it as a flatmap. Should I switch my implementation to reduce? @fhueske what do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103866134 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream.aggs; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +import org.apache.flink.api.java.summarize.aggregation.Aggregator; +import org.apache.flink.api.java.summarize.aggregation.CompensatedSum; +import org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator; + +public class DoubleSummaryAggregation extends NumericSummaryAggregator { --- End diff -- Good point. I searched a while, but could not find something fit, then I decided to create stream specific. The idea was to have something that could be stream optimized eventually. However, I will try to reuse existing one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103866164 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/ProcTimeRowStreamAggregationSqlITCase.java --- @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.java.stream.sql; + +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.types.Row; +import org.apache.flink.table.api.scala.stream.utils.StreamITCase; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.table.api.java.stream.utils.StreamTestData; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class ProcTimeRowStreamAggregationSqlITCase extends StreamingMultipleProgramsTestBase { + + + @Test + public void testMaxAggregatation() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + StreamITCase.clear(); + + env.setParallelism(1); + + DataStream> ds = StreamTestData.get5TupleDataStream(env); + Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e"); + tableEnv.registerTable("MyTable", in); + + String sqlQuery = "SELECT a, MAX(c) OVER (PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable"; + Table result = tableEnv.sql(sqlQuery); + + DataStream resultSet = tableEnv.toDataStream(result, Row.class); + resultSet.addSink(new StreamITCase.StringSink()); + env.execute(); + + List expected = new ArrayList<>(); + expected.add("1,0"); + expected.add("2,1"); + expected.add("2,2"); + expected.add("3,3"); + expected.add("3,4"); + expected.add("3,5"); + expected.add("4,6"); + expected.add("4,7"); + expected.add("4,8"); + expected.add("4,9"); + expected.add("5,10"); + expected.add("5,11"); + expected.add("5,12"); + expected.add("5,14"); + expected.add("5,14"); + + StreamITCase.compareWithList(expected); + } + + @Test + public void testMinAggregatation() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + StreamITCase.clear(); + + env.setParallelism(1); + + DataStream > ds = StreamTestData.get5TupleDataStream(env); + Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e"); + tableEnv.registerTable("MyTable", in); + + String sqlQuery = "SELECT a, MIN(c) OVER (PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable"; + Table result = tableEnv.sql(sqlQuery); + + DataStream resultSet = tableEnv.toDataStream(result, Row.class); + resultSet.addSink(new StreamITCase.StringSink()); + env.execute(); + + List expected = new ArrayList<>(); + expected.add("1,0"); +
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103865652 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/logical/rel/util/WindowAggregateUtil.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.logical.rel.util; + +import java.io.Serializable; +import java.util.List; + +import org.apache.calcite.rel.core.Window.Group; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rex.RexLiteral; + +import com.google.common.collect.ImmutableList; + +public class WindowAggregateUtil implements Serializable { + + private static final long serialVersionUID = -3916551736243544540L; + + private LogicalWindow windowPointer = null; + + public WindowAggregateUtil() { + + } + + public WindowAggregateUtil(LogicalWindow window) { + this.windowPointer = window; + + } + + /** +* A utility function that checks whether a window is partitioned or it is a +* global window. +* +* @param LogicalWindow +*window to be checked for partitions +* @return true if partition keys are defined, false otherwise. +*/ + public boolean isStreamPartitioned(LogicalWindow window) { + // if it exists a group bounded by keys, the it is + // a partitioned window + for (Group group : window.groups) { + if (!group.keys.isEmpty()) { + return true; + } + } + + return false; + } + + public int[] getKeysAsArray(Group group) { + if (group == null) { + return null; + } + return group.keys.toArray(); + } + + /** +* This method returns the [[int]] lowerbound of a window when expressed +* with an integer e.g. ... ROWS BETWEEN [[value]] PRECEDING AND CURRENT ROW +* +* @param constants +*the list of constant to get the offset value +* @return return the value of the lowerbound if available -1 otherwise +*/ + + public int getLowerBoundary(ImmutableList constants) { + return ((Long)constants.get(0).getValue2()).intValue(); --- End diff -- Thank you very much. I was indeed puzzled by this. I will fix it according to your suggestion. I even tried to get to the Calcite mailing list, but nothing like this came out. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103835080 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/DataStreamProcTimeRowAggregate.java --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.Window.Group; +import org.apache.calcite.rel.core.Window.RexWinAggCall; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexInputRef; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.table.api.StreamTableEnvironment; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.calcite.FlinkTypeFactory; +import org.apache.flink.table.plan.logical.rel.util.WindowAggregateUtil; +import org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateRowGlobalWindowFunction; +import org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateRowKeyedWindowFunction; +import org.apache.flink.types.Row; + +public class DataStreamProcTimeRowAggregate extends DataStreamRelJava { + + protected LogicalWindow windowRef; + protected String description; + protected WindowAggregateUtil winUtil; + + public DataStreamProcTimeRowAggregate(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, + RelDataType rowType, String description, LogicalWindow window) { + super(cluster, traitSet, input); + this.windowRef = window; + this.rowType = rowType; + this.description = description; + this.winUtil = new WindowAggregateUtil(); + } + + @Override + protected RelDataType deriveRowType() { + return super.deriveRowType(); + } + + @Override + public RelNode copy(RelTraitSet traitSet, java.util.List inputs) { + + if (inputs.size() != 1) { + System.err.println(this.getClass().getName() + " : Input size must be one!"); + } + + return new DataStreamProcTimeRowAggregate(getCluster(), traitSet, inputs.get(0), getRowType(), getDescription(), + windowRef); + } + + @Override + public DataStream translateToPlan(StreamTableEnvironment tableEnv, Row ignore) { + + TableConfig config = tableEnv.getConfig(); + + DataStream inputDS = ((DataStreamRel) getInput()).translateToPlan(tableEnv); + + System.out.println(inputDS); --- End diff -- That should be removed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103835059 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/logical/rel/util/WindowAggregateUtil.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.logical.rel.util; + +import java.io.Serializable; +import java.util.List; + +import org.apache.calcite.rel.core.Window.Group; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rex.RexLiteral; + +import com.google.common.collect.ImmutableList; + +public class WindowAggregateUtil implements Serializable { + + private static final long serialVersionUID = -3916551736243544540L; + + private LogicalWindow windowPointer = null; + + public WindowAggregateUtil() { + + } + + public WindowAggregateUtil(LogicalWindow window) { + this.windowPointer = window; + + } + + /** +* A utility function that checks whether a window is partitioned or it is a +* global window. +* +* @param LogicalWindow +*window to be checked for partitions +* @return true if partition keys are defined, false otherwise. +*/ + public boolean isStreamPartitioned(LogicalWindow window) { + // if it exists a group bounded by keys, the it is + // a partitioned window + for (Group group : window.groups) { + if (!group.keys.isEmpty()) { + return true; + } + } + + return false; + } + + public int[] getKeysAsArray(Group group) { + if (group == null) { + return null; + } + return group.keys.toArray(); + } + + /** +* This method returns the [[int]] lowerbound of a window when expressed +* with an integer e.g. ... ROWS BETWEEN [[value]] PRECEDING AND CURRENT ROW +* +* @param constants +*the list of constant to get the offset value +* @return return the value of the lowerbound if available -1 otherwise +*/ + + public int getLowerBoundary(ImmutableList constants) { + return ((Long)constants.get(0).getValue2()).intValue(); --- End diff -- I think maybe we can not get the boundary just constants[0], such as ``` select sum(2) over (order by rowtime() range interval '1' hour preceding) as u3 FROM OrderA ``` the boundary will be constants[1], you can use constants.get(lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex - inputRowType.getFieldCount) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103835088 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream.aggs; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +import org.apache.flink.api.java.summarize.aggregation.Aggregator; +import org.apache.flink.api.java.summarize.aggregation.CompensatedSum; +import org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator; + +public class DoubleSummaryAggregation extends NumericSummaryAggregator { --- End diff -- There have been a series of classes in scala do the aggregate, org.apache.flink.table.runtime.aggregate.*, i wonder if we should create another one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103835113 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/ProcTimeRowStreamAggregationSqlITCase.java --- @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.java.stream.sql; + +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.types.Row; +import org.apache.flink.table.api.scala.stream.utils.StreamITCase; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.table.api.java.stream.utils.StreamTestData; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class ProcTimeRowStreamAggregationSqlITCase extends StreamingMultipleProgramsTestBase { + + + @Test + public void testMaxAggregatation() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + StreamITCase.clear(); + + env.setParallelism(1); + + DataStream> ds = StreamTestData.get5TupleDataStream(env); + Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e"); + tableEnv.registerTable("MyTable", in); + + String sqlQuery = "SELECT a, MAX(c) OVER (PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable"; + Table result = tableEnv.sql(sqlQuery); + + DataStream resultSet = tableEnv.toDataStream(result, Row.class); + resultSet.addSink(new StreamITCase.StringSink()); + env.execute(); + + List expected = new ArrayList<>(); + expected.add("1,0"); + expected.add("2,1"); + expected.add("2,2"); + expected.add("3,3"); + expected.add("3,4"); + expected.add("3,5"); + expected.add("4,6"); + expected.add("4,7"); + expected.add("4,8"); + expected.add("4,9"); + expected.add("5,10"); + expected.add("5,11"); + expected.add("5,12"); + expected.add("5,14"); + expected.add("5,14"); + + StreamITCase.compareWithList(expected); + } + + @Test + public void testMinAggregatation() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + StreamITCase.clear(); + + env.setParallelism(1); + + DataStream > ds = StreamTestData.get5TupleDataStream(env); + Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e"); + tableEnv.registerTable("MyTable", in); + + String sqlQuery = "SELECT a, MIN(c) OVER (PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable"; + Table result = tableEnv.sql(sqlQuery); + + DataStream resultSet = tableEnv.toDataStream(result, Row.class); + resultSet.addSink(new StreamITCase.StringSink()); + env.execute(); + + List expected = new ArrayList<>(); + expected.add("1,0"); +
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
GitHub user huawei-flink opened a pull request: https://github.com/apache/flink/pull/3443 [FLINK-5653] Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/huawei-flink/flink FLINK-5653 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3443.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3443 commit 72ec35a7380a4d73bd092ce14962ab2248139bae Author: Stefano BortoliDate: 2017-02-01T16:15:58Z First implementation of ProcTime() commit e98c28616af1cf67d3ad3277d9cc2ca335604eca Author: rtudoran Date: 2017-02-02T10:30:40Z Disambiguate for the OVER BY clause, which should not be treated as a RexOver expression in Logical Project commit b7e6a673b88b6181c06071cd6c7bda55c25a62b4 Author: Stefano Bortoli Date: 2017-02-02T12:07:11Z Added return to disambiguation method for rexover commit cda17565d5969f29b16923b631178a2cbf64791b Author: rtudoran Date: 2017-02-02T16:00:20Z Enable the LogicalWindow operators in query translation commit 4b3e54281018b83c818f91e09a5321c34bbf297b Author: rtudoran Date: 2017-02-03T14:59:39Z Added a DataStreamRel version that can be extended in java commit cc960d699db369cc8dc4e155cc5c5f6c3baf74a4 Author: rtudoran Date: 2017-02-03T15:35:18Z Add skeleton for the implementation of the aggregates over sliding window with processing time and time boundaries commit 2390a9d3dc15afba01185c47f61a9ea830ea5acc Author: Stefano Bortoli Date: 2017-02-06T10:33:57Z committing changes with stub modifications before chekout proctime branch commit eaf4e92784dab01b17004390968ca4b1fe7c4bea Author: Stefano Bortoli Date: 2017-02-06T13:17:43Z ignore aggregation test and implemented simple proctime test commit 16ccd7f5bf019803ea8b53f09a126ec53a5a6d59 Author: Stefano Bortoli Date: 2017-02-06T14:17:03Z Merge branch 'FLINK-5710' of https://github.com/huawei-flink/flink into FLINK-5653 commit 10f7bc5e2086e41ec76cbabdcd069c71a491671a Author: Stefano Bortoli Date: 2017-02-07T09:42:41Z committing first key selector and utils commit 31060e46f78729880c03e8cab0f92ff06faec4f0 Author: Stefano Bortoli Date: 2017-02-07T11:16:43Z Changed ProcTime from time to timestamp commit 69289bad836a5fdace271b28a15ca0e309e50b17 Author: rtudoran Date: 2017-02-07T13:13:23Z Merge branch 'FLINK-5710' of https://github.com/huawei-flink/flink into FLINK-5654 commit 3392817045ed166df5f55d22fde34cbd98c775db Author: rtudoran Date: 2017-02-07T13:14:50Z Merge branch 'FLINK-5653' of https://github.com/huawei-flink/flink into FLINK-5654 commit d2ea0076b5e3561585c4eaea84025e50beaacf9a Author: Stefano Bortoli Date: 2017-02-07T09:42:41Z fixing linelength and other issues commit f29f564bb7fe7496b9f3d2f45a6b4469af559378 Author: Stefano Bortoli Date: 2017-02-07T13:46:30Z Merge branch 'FLINK-5653' of https://github.com/huawei-flink/flink.git into FLINK-5653 Conflicts: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/logical/rel/DataStreamWindowRowAggregate.java flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/logical/rel/util/StreamGroupKeySelector.java commit ea145ecefc2be1bea71e995dbf39585e7fa44012 Author: rtudoran