[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16333421#comment-16333421
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5241


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1646#comment-1646
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5241
  
Thanks for the update and rebasing @sunjincheng121.
The PR looks good. Will run final tests and merge it.

Best, Fabian


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16333199#comment-16333199
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/5241
  
Thanks @fhueske,  I had rebase the code. I appreciate if you can review the 
changes.

Best, Jincheng


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16332842#comment-16332842
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5241
  
Please rebase the PR on the master once #5320 was merged.
I'll have a look at the changes after the rebase.

Thank you, Fabian


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16332422#comment-16332422
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r16264
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1130,259 +1137,263 @@ object AggregateUtil {
 // create aggregate function instances by function type and aggregate 
field data type.
 aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
   val argList: util.List[Integer] = aggregateCall.getArgList
-  if (argList.isEmpty) {
-if 
(aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
-  aggFieldIndexes(index) = Array[Int](0)
-} else {
-  throw new TableException("Aggregate fields should not be empty.")
+
+  if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
+aggregates(index) = new CountAggFunction
+if(argList.isEmpty) {
--- End diff --

+space


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16332421#comment-16332421
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r162644529
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetEmptyProcessMapPartition.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang
+
+import org.apache.flink.api.common.functions.RichMapPartitionFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.{Compiler, 
GeneratedAggregationsFunction}
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Set default value of all aggregates for non-grouped aggregates on 
empty dataset.
+  */
+class DataSetEmptyProcessMapPartition(genAggregations: 
GeneratedAggregationsFunction)
--- End diff --

Can be removed if we extend `DataSetFinalAggFunction` to also implement 
`MapPartitionFunction`


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16332419#comment-16332419
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r162643053
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 ---
@@ -164,7 +164,6 @@ object FlinkRuleSets {
 // translate to Flink DataSet nodes
 DataSetWindowAggregateRule.INSTANCE,
 DataSetAggregateRule.INSTANCE,
-DataSetAggregateWithNullValuesRule.INSTANCE,
--- End diff --

Please move the changes to remove this rule into a separate PR.


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16332423#comment-16332423
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r162649590
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1130,259 +1137,263 @@ object AggregateUtil {
 // create aggregate function instances by function type and aggregate 
field data type.
 aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
   val argList: util.List[Integer] = aggregateCall.getArgList
-  if (argList.isEmpty) {
-if 
(aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
-  aggFieldIndexes(index) = Array[Int](0)
-} else {
-  throw new TableException("Aggregate fields should not be empty.")
+
+  if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
+aggregates(index) = new CountAggFunction
+if(argList.isEmpty) {
+  aggFieldIndexes(index) = Array[Int](-1)
+}else{
--- End diff --

+spaces


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16332424#comment-16332424
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r162645683
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
 ---
@@ -157,6 +158,7 @@ class DataSetAggregate(
   } else {
 inputDS
   .reduceGroup(finalAgg)
+  .mapPartition(emptyProcessMapPartition.get)
--- End diff --

We should implement the pre-aggregated non-grouped agg also with 
`mapPartition` then, IMO.


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16332420#comment-16332420
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r162643699
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -829,7 +833,8 @@ object AggregateUtil {
   outputType: RelDataType,
   groupings: Array[Int]): (Option[DataSetPreAggFunction],
 Option[TypeInformation[Row]],
-RichGroupReduceFunction[Row, Row]) = {
+RichGroupReduceFunction[Row, Row],
+Option[MapPartitionFunction[Row, Row]]) = {
--- End diff --

Please remove this parameter and extend `DataSetFinalAggFunction` to also 
implement `RichMapPartitionFunction` such that it can be used as both (similar 
to `DataSetPreAggFunction`).


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16332417#comment-16332417
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r162642637
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
 ---
@@ -157,6 +158,7 @@ class DataSetAggregate(
   } else {
 inputDS
   .reduceGroup(finalAgg)
+  .mapPartition(emptyProcessMapPartition.get)
--- End diff --

I thought about this again. 

I think we should extend `DataSetFinalAggFunction` to also implement 
`MapPartitionFunction` similar to the `DataSetPreAggFunction`. The 
`GroupReduceFunction.reduceGroup()` method and the 
`MapPartitionFunction.mapPartition()` function share the same code. For the 
grouped aggregation, we use `reduceGroup(function)` and for the non-grouped 
aggregation we use `mapPartition(function).setParallelism(1)`. Setting the 
parallelism is important here.

That way we avoid an additional function and its a cleaner design.


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16332418#comment-16332418
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r162639253
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunction.scala
 ---
@@ -33,7 +33,20 @@ class CountAccumulator extends JTuple1[Long] {
 /**
   * built-in count aggregate function
   */
-class CountAggFunction extends AggregateFunction[JLong, CountAccumulator] {
+class CountAggFunction
+  extends AggregateFunction[JLong, CountAccumulator] {
+
+  // process argument is optimized by Calcite.
+  // For instance count(42) or count(*) which will optimized to count().
--- End diff --

`For instance count(42) or count(*) which will optimized to count().` -> 
`For instance count(42) or count(*) will be optimized to count().`


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326164#comment-16326164
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/5241
  
@fhueske Thanks for your review and suggestion about FLINK-8355, I have 
fixed the FLINK-8355 issue in this PR. I appreciate if you can review the 
change again.

Best, Jincheng


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319938#comment-16319938
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160622372
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
 ---
@@ -278,7 +278,14 @@ class SetOperatorsITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  /**
+* This test will checks IN for NULLs based on whether COUNT (*) and 
COUNT (a) are equal. Due to
+* 
[[org.apache.flink.table.plan.rules.dataSet.DataSetAggregateWithNullValuesRule]]
 will
+* union a NULL row in to input DataSet for non-groupBy agg. That 
caused COUNT (*) and COUNT(a)
+* are not equal. So this test case ignored before FLINK-8355 be fixed.
--- End diff --

No worries. I think if we fix FLINK-8355 first, the test cases should work 
correctly. I've added a comment to FLINK-8355 with an idea how to fix it.


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318688#comment-16318688
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160454995
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
 ---
@@ -278,7 +278,14 @@ class SetOperatorsITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  /**
+* This test will checks IN for NULLs based on whether COUNT (*) and 
COUNT (a) are equal. Due to
+* 
[[org.apache.flink.table.plan.rules.dataSet.DataSetAggregateWithNullValuesRule]]
 will
+* union a NULL row in to input DataSet for non-groupBy agg. That 
caused COUNT (*) and COUNT(a)
+* are not equal. So this test case ignored before FLINK-8355 be fixed.
--- End diff --

Maybe I did not clearly express what I mean. :) 


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318629#comment-16318629
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160442930
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
 ---
@@ -278,7 +278,14 @@ class SetOperatorsITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  /**
+* This test will checks IN for NULLs based on whether COUNT (*) and 
COUNT (a) are equal. Due to
+* 
[[org.apache.flink.table.plan.rules.dataSet.DataSetAggregateWithNullValuesRule]]
 will
+* union a NULL row in to input DataSet for non-groupBy agg. That 
caused COUNT (*) and COUNT(a)
+* are not equal. So this test case ignored before FLINK-8355 be fixed.
--- End diff --

It is correct, that the current state does not work for all queries and 
input data. There is a bug that needs to be fixed. However, it is working 
correctly for the *given* test.

The test is correct and is now failing due to this PR. Hence it causes a 
regression. Your changes fix a bug but introduce another one.


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318622#comment-16318622
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160442170
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
 ---
@@ -254,7 +254,7 @@ class JoinITCase(
 val table = 
CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
 tEnv.registerTable("A", table)
 
-val sqlQuery2 = "SELECT * FROM (SELECT count(*) FROM A) CROSS JOIN A"
--- End diff --

Sounds good.  


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318607#comment-16318607
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160439631
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
 ---
@@ -278,7 +278,14 @@ class SetOperatorsITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  /**
+* This test will checks IN for NULLs based on whether COUNT (*) and 
COUNT (a) are equal. Due to
+* 
[[org.apache.flink.table.plan.rules.dataSet.DataSetAggregateWithNullValuesRule]]
 will
+* union a NULL row in to input DataSet for non-groupBy agg. That 
caused COUNT (*) and COUNT(a)
+* are not equal. So this test case ignored before FLINK-8355 be fixed.
--- End diff --

The test was passing before because both `COUNT(*)` and `COUNT(a)` ignore 
the `NULL` before this PR. That's a bug please seee SQL 92 
https://docs.microsoft.com/en-us/sql/t-sql/functions/count-transact-sql 
```
COUNT(*) returns the number of rows in a specified table without getting 
rid of duplicates. It counts each row separately. This includes rows that 
contain null values.
```


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318603#comment-16318603
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160438113
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
 ---
@@ -254,7 +254,7 @@ class JoinITCase(
 val table = 
CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
 tEnv.registerTable("A", table)
 
-val sqlQuery2 = "SELECT * FROM (SELECT count(*) FROM A) CROSS JOIN A"
--- End diff --

The test was working correctly before. If you remove all your changes, the 
test will pass. It started to fail when you added the count aggregate that is 
not null-aware. It does not matter which rules are applied during optimization 
as long as the query computes for the given input data the correct result. The 
test is correct and is failing due to the changes of the PR. Hence, the PR 
breaks the correctness of some queries that were working before.

IMO, we cannot merge it in the current state. We have to fix FLINK-8355 
before we can merge this PR. I have an idea how to fix FLINK-8355 and will add 
that to the ticket.


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318597#comment-16318597
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160437205
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
 ---
@@ -278,7 +278,14 @@ class SetOperatorsITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  /**
+* This test will checks IN for NULLs based on whether COUNT (*) and 
COUNT (a) are equal. Due to
+* 
[[org.apache.flink.table.plan.rules.dataSet.DataSetAggregateWithNullValuesRule]]
 will
+* union a NULL row in to input DataSet for non-groupBy agg. That 
caused COUNT (*) and COUNT(a)
+* are not equal. So this test case ignored before FLINK-8355 be fixed.
--- End diff --

The rule or anything else that happens during optimization does not matter. 
The only thing that counts are input data, query, and output data. Given the 
query and the input data, the current result is correct. 

The test was passing before. If it fails now, it is clear that the PR broke 
it.


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318558#comment-16318558
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160430205
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
 ---
@@ -278,7 +278,14 @@ class SetOperatorsITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  /**
+* This test will checks IN for NULLs based on whether COUNT (*) and 
COUNT (a) are equal. Due to
+* 
[[org.apache.flink.table.plan.rules.dataSet.DataSetAggregateWithNullValuesRule]]
 will
+* union a NULL row in to input DataSet for non-groupBy agg. That 
caused COUNT (*) and COUNT(a)
+* are not equal. So this test case ignored before FLINK-8355 be fixed.
--- End diff --

`6` times true and `9` times false is my expect.. but because 
`DataSetAggregateWithNullValuesRule` we will get the `9` times `null`.  as 
follows:
```
 expected: [false, false, false, false, false, false, false, false, false, 
true, true, true, true, true, true]
 received: [null, null, null, null, null, null, null, null, null, true, 
true, true, true, true, true] 
```


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318548#comment-16318548
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160428673
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1130,259 +1134,263 @@ object AggregateUtil {
 // create aggregate function instances by function type and aggregate 
field data type.
 aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
   val argList: util.List[Integer] = aggregateCall.getArgList
-  if (argList.isEmpty) {
-if 
(aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
-  aggFieldIndexes(index) = Array[Int](0)
--- End diff --

I think it should work, but I do not want deal with `SqlCountAggFunction` 
twice, so i remove the `SqlCountAggFunction` from case condition. 


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318538#comment-16318538
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160426130
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
 ---
@@ -254,7 +254,7 @@ class JoinITCase(
 val table = 
CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
 tEnv.registerTable("A", table)
 
-val sqlQuery2 = "SELECT * FROM (SELECT count(*) FROM A) CROSS JOIN A"
--- End diff --

HI, @fhueske I do not think The PR breaks the correctness of the test :), 
because when we run this test case will match 
`DataSetAggregateWithNullValuesRule`. which will union a NULL  row to dataset.  
so using `COUNT(a1)` result `3` and using `COUNT(*)` result `4`. I think this 
is batch bug.  The snip as follow:

![image](https://user-images.githubusercontent.com/22488084/34726497-250f4b30-f58f-11e7-92cf-595f9c9a64e0.png)
 
I think we can talk about this bug in FlINK-8355... What do you think?



> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318501#comment-16318501
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160421752
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
 ---
@@ -278,7 +278,14 @@ class SetOperatorsITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  /**
+* This test will checks IN for NULLs based on whether COUNT (*) and 
COUNT (a) are equal. Due to
+* 
[[org.apache.flink.table.plan.rules.dataSet.DataSetAggregateWithNullValuesRule]]
 will
+* union a NULL row in to input DataSet for non-groupBy agg. That 
caused COUNT (*) and COUNT(a)
+* are not equal. So this test case ignored before FLINK-8355 be fixed.
--- End diff --

I had a look at the test and think that it is correct.  `SELECT a FROM 
Table3` returns `1, 2, 3` and `SELECT d FROM Table5` returns `1, 2, 2, 3, 3, 3, 
4, 4, 4, 4, 5, 5, 5, 5, 5`. So the result of the query should be 6 times `true` 
and 9 times `false`.

Which result woud you expect?


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318495#comment-16318495
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160420917
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
 ---
@@ -254,7 +254,7 @@ class JoinITCase(
 val table = 
CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
 tEnv.registerTable("A", table)
 
-val sqlQuery2 = "SELECT * FROM (SELECT count(*) FROM A) CROSS JOIN A"
--- End diff --

The current result is correct. `CollectionDataSets.getSmall3TupleDataSet` 
has three rows. Hence `SELECT count(a1) FROM A` must be `3` and not `4`. 
The PR breaks the correctness of the test.


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318487#comment-16318487
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160419117
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
 ---
@@ -278,7 +278,14 @@ class SetOperatorsITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  /**
+* This test will checks IN for NULLs based on whether COUNT (*) and 
COUNT (a) are equal. Due to
+* 
[[org.apache.flink.table.plan.rules.dataSet.DataSetAggregateWithNullValuesRule]]
 will
+* union a NULL row in to input DataSet for non-groupBy agg. That 
caused COUNT (*) and COUNT(a)
+* are not equal. So this test case ignored before FLINK-8355 be fixed.
--- End diff --

A workaround way is we create a `transformToBatchAggregateFunctions` or add 
a parameter to `transformToAggregateFunctions` which can identify batch or 
stream.
but not elegant :(.


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318475#comment-16318475
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160416641
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
 ---
@@ -278,7 +278,14 @@ class SetOperatorsITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  /**
+* This test will checks IN for NULLs based on whether COUNT (*) and 
COUNT (a) are equal. Due to
+* 
[[org.apache.flink.table.plan.rules.dataSet.DataSetAggregateWithNullValuesRule]]
 will
+* union a NULL row in to input DataSet for non-groupBy agg. That 
caused COUNT (*) and COUNT(a)
+* are not equal. So this test case ignored before FLINK-8355 be fixed.
--- End diff --

I think this test case is bug of batch, so we should open it after 
FLINK-8355. 
You are right we not sure how to fix the FLINK-8355 current time, but we 
are sure add a `null` row is not correct(best) way to deal with `non-groupby` 
query.  But I think we should fix FLINK-8355 after this PR merged(when we know 
how to fix it). What do you think?


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318461#comment-16318461
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160414914
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
 ---
@@ -254,7 +254,7 @@ class JoinITCase(
 val table = 
CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
 tEnv.registerTable("A", table)
 
-val sqlQuery2 = "SELECT * FROM (SELECT count(*) FROM A) CROSS JOIN A"
--- End diff --

Yes, when using `COUNT(*) ` we must change result expect. 
``` 
 expected: [3,1,1,Hi, 3,2,2,Hello, 3,3,2,Hello world]
 received: [4,1,1,Hi, 4,2,2,Hello, 4,3,2,Hello world] 
```


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318457#comment-16318457
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160413930
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -145,6 +145,14 @@ class DataStreamOverAggregate(
   inputSchema.typeInfo,
   Some(constants))
 
+val constantsTypeInfo =
+  
Some(constants).map(_.map(generator.generateExpression(_))).getOrElse(Seq()).map(_.resultType)
+  val aggInputTypeInfo = constantsTypeInfo.++:(inputSchema.fieldTypeInfos)
+
+val aggregateInputType =
+  cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
--- End diff --

+1


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318456#comment-16318456
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160413762
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -359,12 +359,17 @@ class AggregationCodeGenerator(
 
   val accumulate: String = {
 for (i <- aggs.indices) yield {
-  j"""
- |${accTypes(i)} acc$i = (${accTypes(i)}) 
accs.getField($i);
- |${genDataViewFieldSetter(s"acc$i", i)}
- |${aggs(i)}.accumulate(
- |  acc$i,
- |  ${parametersCode(i)});""".stripMargin
--- End diff --

The code is very simple. bug when `!parametersCode(i).isEmpty`, will 
generate the following code: 
`.accumulate(acc0());` which we do not wanted.


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317225#comment-16317225
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160251387
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunction.scala
 ---
@@ -33,7 +33,20 @@ class CountAccumulator extends JTuple1[Long] {
 /**
   * built-in count aggregate function
   */
-class CountAggFunction extends AggregateFunction[JLong, CountAccumulator] {
+class CountAggFunction
+  extends AggregateFunction[JLong, CountAccumulator] {
+
+  // process argument is optimized by calcite.
--- End diff --

`calcite` -> `Calcite`


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317218#comment-16317218
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160250423
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -121,7 +121,7 @@ class AggregationCodeGenerator(
 
 // get parameter lists for aggregation functions
 val parametersCode = aggFields.map { inFields =>
-  val fields = inFields.map { f =>
+  val fields =  inFields.filter(index => index > -1).map { f =>
--- End diff --

remove double space


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317220#comment-16317220
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160251401
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunction.scala
 ---
@@ -33,7 +33,20 @@ class CountAccumulator extends JTuple1[Long] {
 /**
   * built-in count aggregate function
   */
-class CountAggFunction extends AggregateFunction[JLong, CountAccumulator] {
+class CountAggFunction
+  extends AggregateFunction[JLong, CountAccumulator] {
+
+  // process argument is optimized by calcite.
+  // For instance count(42) or count(*) which will optimized to count().
+  def accumulate(acc: CountAccumulator): Unit = {
+acc.f0 += 1L
+  }
+
+  // process argument is optimized by calcite.
--- End diff --

`calcite` -> `Calcite`


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317223#comment-16317223
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160255883
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/AggFunctionTestBase.scala
 ---
@@ -146,14 +146,31 @@ abstract class AggFunctionTestBase[T, ACC] {
 val accumulator = aggregator.createAccumulator()
 vals.foreach(
   v =>
-accumulateFunc.invoke(aggregator, 
accumulator.asInstanceOf[Object], v.asInstanceOf[Object])
+if(accumulateFunc.getParameterCount == 1){
+  this.accumulateFunc.invoke(aggregator, 
accumulator.asInstanceOf[Object])
+}else{
--- End diff --

add spaces `} else {`


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317217#comment-16317217
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160250511
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -121,7 +121,7 @@ class AggregationCodeGenerator(
 
 // get parameter lists for aggregation functions
 val parametersCode = aggFields.map { inFields =>
-  val fields = inFields.map { f =>
+  val fields =  inFields.filter(index => index > -1).map { f =>
--- End diff --

`filter(index => index > -1)` can be shortend to `filter(_ > -1)`
  


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317228#comment-16317228
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160260046
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/GroupWindowITCase.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.runtime.stream.sql.GroupWindowITCase.TimestampAndWatermarkWithOffset
+import org.apache.flink.table.runtime.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.mutable
+
+class GroupWindowITCase extends StreamingWithStateTestBase {
--- End diff --

Add these tests to `SqlITCase` instead of creating a new class. The new 
test class will result in a new mini cluster to be setup which is quite 
expensive


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317230#comment-16317230
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160255960
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/AggFunctionTestBase.scala
 ---
@@ -146,14 +146,31 @@ abstract class AggFunctionTestBase[T, ACC] {
 val accumulator = aggregator.createAccumulator()
 vals.foreach(
   v =>
-accumulateFunc.invoke(aggregator, 
accumulator.asInstanceOf[Object], v.asInstanceOf[Object])
+if(accumulateFunc.getParameterCount == 1){
--- End diff --

add spaces `if (accumulateFunc.getParameterCount == 1) {`


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317233#comment-16317233
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160271966
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1130,259 +1134,263 @@ object AggregateUtil {
 // create aggregate function instances by function type and aggregate 
field data type.
 aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
   val argList: util.List[Integer] = aggregateCall.getArgList
-  if (argList.isEmpty) {
-if 
(aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
-  aggFieldIndexes(index) = Array[Int](0)
--- End diff --

Couldn't just change this line to `aggFieldIndexes(index) = Array[Int](-1)` 
instead of touching each line of this method?


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317224#comment-16317224
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160265977
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -145,6 +145,14 @@ class DataStreamOverAggregate(
   inputSchema.typeInfo,
   Some(constants))
 
+val constantsTypeInfo =
+  
Some(constants).map(_.map(generator.generateExpression(_))).getOrElse(Seq()).map(_.resultType)
+  val aggInputTypeInfo = constantsTypeInfo.++:(inputSchema.fieldTypeInfos)
+
+val aggregateInputType =
+  cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
--- End diff --

It should be possible to create a `RelDataType` without going through code 
generation and `TypeInformation`. We have a `RelDataType` for the input row and 
`RelDataType`s for the `RexNode`s. 

Check if we can use `FlinkTypeFactory.createStructType()` to create a 
`RelDataType`.


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317227#comment-16317227
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160253454
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -359,12 +359,17 @@ class AggregationCodeGenerator(
 
   val accumulate: String = {
 for (i <- aggs.indices) yield {
-  j"""
- |${accTypes(i)} acc$i = (${accTypes(i)}) 
accs.getField($i);
- |${genDataViewFieldSetter(s"acc$i", i)}
- |${aggs(i)}.accumulate(
- |  acc$i,
- |  ${parametersCode(i)});""".stripMargin
--- End diff --

Change to 

```
 |${aggs(i)}.accumulate(
 |  acc$i
 |${if (!parametersCode(i).isEmpty) ","}
 |  ${parametersCode(i)}
 |);""".stripMargin
```
  


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317234#comment-16317234
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160268630
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1130,259 +1134,263 @@ object AggregateUtil {
 // create aggregate function instances by function type and aggregate 
field data type.
 aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
   val argList: util.List[Integer] = aggregateCall.getArgList
-  if (argList.isEmpty) {
-if 
(aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
-  aggFieldIndexes(index) = Array[Int](0)
-} else {
-  throw new TableException("Aggregate fields should not be empty.")
+
+  if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
+aggregates(index) = new CountAggFunction()
+if(argList.isEmpty) {
--- End diff --

add space


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317231#comment-16317231
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160268601
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1130,259 +1134,263 @@ object AggregateUtil {
 // create aggregate function instances by function type and aggregate 
field data type.
 aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
   val argList: util.List[Integer] = aggregateCall.getArgList
-  if (argList.isEmpty) {
-if 
(aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
-  aggFieldIndexes(index) = Array[Int](0)
-} else {
-  throw new TableException("Aggregate fields should not be empty.")
+
+  if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
+aggregates(index) = new CountAggFunction()
+if(argList.isEmpty) {
+  aggFieldIndexes(index) = Array[Int](-1)
+}else{
--- End diff --

add spaces


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317219#comment-16317219
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160250581
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -139,7 +139,7 @@ class AggregationCodeGenerator(
 val classes = 
UserDefinedFunctionUtils.typeInfoToClass(physicalInputTypes)
 val constantClasses = 
UserDefinedFunctionUtils.typeInfoToClass(constantTypes)
 val methodSignaturesList = aggFields.map { inFields =>
-  inFields.map { f =>
+  inFields.filter(index => index > -1).map { f =>
--- End diff --

`filter(index => index > -1)` can be shortend to `filter(_ > -1)`


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317221#comment-16317221
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160257426
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
 ---
@@ -254,7 +254,7 @@ class JoinITCase(
 val table = 
CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
 tEnv.registerTable("A", table)
 
-val sqlQuery2 = "SELECT * FROM (SELECT count(*) FROM A) CROSS JOIN A"
--- End diff --

Why are you changing the query? Is it not working anymore?


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317232#comment-16317232
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160267929
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -145,6 +145,14 @@ class DataStreamOverAggregate(
   inputSchema.typeInfo,
   Some(constants))
 
+val constantsTypeInfo =
+  
Some(constants).map(_.map(generator.generateExpression(_))).getOrElse(Seq()).map(_.resultType)
+  val aggInputTypeInfo = constantsTypeInfo.++:(inputSchema.fieldTypeInfos)
+
+val aggregateInputType =
+  cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
--- End diff --

Something like:

```
val constantTypes = constants.map(_.getType)
val fieldTypes = input.getRowType.getFieldList.asScala.map(_.getType)
val aggInTypes = constantTypes ++ fieldTypes
val aggInNames = aggInTypes.indices.map("f" + _)

val aggInRowType = 
getCluster.getTypeFactory.createStructType(aggInTypes.asJava, aggInNames.asJava)
```



> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317229#comment-16317229
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160259534
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
 ---
@@ -278,7 +278,14 @@ class SetOperatorsITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  /**
+* This test will checks IN for NULLs based on whether COUNT (*) and 
COUNT (a) are equal. Due to
+* 
[[org.apache.flink.table.plan.rules.dataSet.DataSetAggregateWithNullValuesRule]]
 will
+* union a NULL row in to input DataSet for non-groupBy agg. That 
caused COUNT (*) and COUNT(a)
+* are not equal. So this test case ignored before FLINK-8355 be fixed.
--- End diff --

This PR introduces a regression. Previously working queries will compute 
incorrect results when we merge it. 

IMO, we should only merge it if all existing tests pass. The problem is 
that FLINK-8355 cannot be easily fixed. We need to add a NULL tuple, because 
Flink's DataSet API won't compute any result if the input is empty. Not sure 
how this can be resolved.
  


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317222#comment-16317222
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160260174
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/GroupWindowITCase.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.runtime.stream.sql.GroupWindowITCase.TimestampAndWatermarkWithOffset
+import org.apache.flink.table.runtime.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.mutable
+
+class GroupWindowITCase extends StreamingWithStateTestBase {
+
+  val data = List(
+(1000L, "1", "Hello"),
+(2000L, "2", "Hello"),
+(3000L, null.asInstanceOf[String], "Hello"),
+(4000L, "4", "Hello"),
+(5000L, null.asInstanceOf[String], "Hello"),
+(6000L, "6", "Hello"),
+(7000L, "7", "Hello World"),
+(8000L, "8", "Hello World"),
+(2L, "20", "Hello World"))
+
+  @Test
+  def testRowTimeTumbleWindow(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+StreamITCase.clear
+env.setParallelism(1)
+
+val stream = env
+  .fromCollection(data)
+  .assignTimestampsAndWatermarks(
+new TimestampAndWatermarkWithOffset[(Long, String, String)](0L))
+val table = stream.toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+tEnv.registerTable("T1", table)
+
+val sqlQuery = "SELECT c, COUNT(*), COUNT(1), COUNT(b) FROM T1 " +
+  "GROUP BY TUMBLE(rowtime, interval '5' SECOND), c"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List("Hello World,2,2,2", "Hello World,1,1,1", 
"Hello,4,4,3", "Hello,2,2,1")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testUnboundedGroupWindow(): Unit = {
--- End diff --

Rename to `testNonWindowedCount()`


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16317226#comment-16317226
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5241#discussion_r160253551
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -383,12 +388,16 @@ class AggregationCodeGenerator(
 
   val retract: String = {
 for (i <- aggs.indices) yield {
-  j"""
- |${accTypes(i)} acc$i = (${accTypes(i)}) 
accs.getField($i);
- |${genDataViewFieldSetter(s"acc$i", i)}
- |${aggs(i)}.retract(
- |  acc$i,
- |  ${parametersCode(i)});""".stripMargin
--- End diff --

same as for accumulate


> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8325) Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1)

2018-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16311503#comment-16311503
 ] 

ASF GitHub Bot commented on FLINK-8325:
---

GitHub user sunjincheng121 opened a pull request:

https://github.com/apache/flink/pull/5241

[FLINK-8325][table] Add COUNT(*),COUNT(1) supported

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## What is the purpose of the change

* This PR add COUNT(\*),COUNT(1) supported.

## Brief change log

  - Add a construct parameter for `CountAggFunction`
  - Using `aggregateCall.argList` as parameter to create CountAggFunction` 
instance 
 in AggregateUtil.`transformToAggregateFunctions` 


## Verifying this change

This change added tests and can be verified as follows:

  - *Add `GroupWindowITCase` to test bounded and unbounded group agg.*
  - *Add `CountAggFunctionWithConstantTest` to test COUNT aggregate 
function with non-nullable parameter*
  - *Modify some batch test case for corrected test result.*
 

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): ( no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sunjincheng121/flink FLINK-8325-PR

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5241.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5241


commit 51621e29fa6cb0780716c884ab9b9f0c3f468b08
Author: 金竹 
Date:   2018-01-03T15:13:49Z

[FLINK-8325][table] Add COUNT(*),COUNT(1) supported




> Add COUNT AGG support constant parameter, i.e. COUNT(*), COUNT(1) 
> --
>
> Key: FLINK-8325
> URL: https://issues.apache.org/jira/browse/FLINK-8325
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> COUNT(1) with Group Window, always output 0. 
> e.g.
> DATA:
> {code}
> val data = List(
> (1L, 1, "Hi"),
> (2L, 2, "Hello"),
> (4L, 2, "Hello"),
> (8L, 3, "Hello world"),
> (16L, 3, "Hello world"))
> {code}
> SQL:
> {code}
> SELECT b, COUNT(1) FROM MyTable GROUP BY Hop(proctime, interval '0.001' 
> SECOND, interval '0.002' SECOND),b
> {code}
> OUTPUT:
> {code}
> 1,0,1, 
> 1,0,1, 
> 2,0,1,
> 2,0,1, 
> 2,0,2, 
> 3,0,1,
> 3,0,1
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)