[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199360#comment-16199360
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4585


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-10-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198950#comment-16198950
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4585
  
Merging


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192711#comment-16192711
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r142903359
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1412,23 +1412,12 @@ object AggregateUtil {
 
 case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT 
=>
   aggregates(index) = sqlTypeName match {
-case TINYINT =>
-  new ByteCollectAggFunction
-case SMALLINT =>
-  new ShortCollectAggFunction
-case INTEGER =>
-  new IntCollectAggFunction
-case BIGINT =>
-  new LongCollectAggFunction
-case VARCHAR | CHAR =>
-  new StringCollectAggFunction
-case FLOAT =>
-  new FloatCollectAggFunction
-case DOUBLE =>
-  new DoubleCollectAggFunction
+case TINYINT | SMALLINT | INTEGER | BIGINT | VARCHAR | CHAR | 
FLOAT | DOUBLE =>
--- End diff --

I was rather thinking to remove the `match case` block completely and set 

```
aggregates(index) = new 
CollectAggFunction(FlinkTypeFactory.toTypeInfo(relDataType))
```


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192546#comment-16192546
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/4585
  
@fhueske Addressed your comments. PTAL. Much appreciated.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191244#comment-16191244
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r142665388
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -92,6 +92,63 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
   }
 
+  @Test
+  def testUnboundedGroupByCollect(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
--- End diff --

add `env.setStateBackend(this.getStateBackend)` to enforce serialization 
through the `MapView`.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191246#comment-16191246
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r142664211
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+import scala.collection.JavaConverters._
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+  this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 
== that.f0
+  case _ => false
+}
+}
+
+abstract class CollectAggFunction[E]
--- End diff --

I don't think we need to make this class abstract. Instead, we should add a 
constructor that asks for the `TypeInformation` of the value. Then we don't 
need to subclass the aggregation function and avoid most generic value types 
for non-primitive fields.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191248#comment-16191248
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r142667720
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -92,6 +92,63 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
   }
 
+  @Test
+  def testUnboundedGroupByCollect(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+
+val sqlQuery = "SELECT b, COLLECT(a) FROM MyTable GROUP BY b"
+
+val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv).as('a, 
'b, 'c)
+tEnv.registerTable("MyTable", t)
+
+val result = tEnv.sql(sqlQuery).toRetractStream[Row]
+result.addSink(new StreamITCase.RetractingSink).setParallelism(1)
+env.execute()
+
+val expected = List(
+  "1,{1=1}",
+  "2,{2=1, 3=1}",
+  "3,{4=1, 5=1, 6=1}",
+  "4,{7=1, 8=1, 9=1, 10=1}",
+  "5,{11=1, 12=1, 13=1, 14=1, 15=1}",
+  "6,{16=1, 17=1, 18=1, 19=1, 20=1, 21=1}")
+assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+  }
+
+  @Test
+  def testUnboundedGroupByCollectWithObject(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
--- End diff --

add `env.setStateBackend(this.getStateBackend)` to enforce serialization 
through the `MapView`.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191245#comment-16191245
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r142664563
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1410,6 +1410,26 @@ object AggregateUtil {
 case _: SqlCountAggFunction =>
   aggregates(index) = new CountAggFunction
 
+case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT 
=>
+  aggregates(index) = sqlTypeName match {
--- End diff --

We can pass the actual `TypeInformation` of the argument type here to the 
constructor of the `CollectAggFunction` and don't need to distinguish the 
different argument types.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191247#comment-16191247
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r142667939
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1410,6 +1410,26 @@ object AggregateUtil {
 case _: SqlCountAggFunction =>
   aggregates(index) = new CountAggFunction
 
+case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT 
=>
+  aggregates(index) = sqlTypeName match {
+case TINYINT =>
+  new ByteCollectAggFunction
+case SMALLINT =>
+  new ShortCollectAggFunction
+case INTEGER =>
+  new IntCollectAggFunction
+case BIGINT =>
+  new LongCollectAggFunction
+case VARCHAR | CHAR =>
+  new StringCollectAggFunction
+case FLOAT =>
+  new FloatCollectAggFunction
+case DOUBLE =>
+  new DoubleCollectAggFunction
+case _ =>
+  new ObjectCollectAggFunction
+  }
+
--- End diff --

we need to set `accTypes(index) = aggregates(index).getAccumulatorType` in 
order to activate the `MapView` feature.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-29 Thread Shuyi Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16186602#comment-16186602
 ] 

Shuyi Chen commented on FLINK-7491:
---

[~fhueske] can you help take another look at the PR? I've addressed your 
comments. Much appreciated.

> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183941#comment-16183941
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141576005
  
--- Diff: docs/dev/table/sql.md ---
@@ -2107,6 +2108,17 @@ VAR_SAMP(value)
 Returns the sample variance (square of the sample standard 
deviation) of the numeric field across all input values.
   
 
+
+
+  
+  {% highlight text %}
+  COLLECT(value)
+  {% endhighlight %}
+  
+  
+  Returns a multiset of the values.
--- End diff --

done


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183942#comment-16183942
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141576017
  
--- Diff: docs/dev/table/sql.md ---
@@ -746,6 +746,7 @@ The SQL runtime is built on top of Flink's DataSet and 
DataStream APIs. Internal
 | `Types.PRIMITIVE_ARRAY`| `ARRAY` | e.g. `int[]`  
 |
 | `Types.OBJECT_ARRAY`   | `ARRAY` | e.g. 
`java.lang.Byte[]`|
 | `Types.MAP`| `MAP`   | 
`java.util.HashMap`|
+| `Types.MULTISET`   | `MULTISET`  | 
`java.util.HashMap`|
--- End diff --

done


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183603#comment-16183603
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141517953
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
--- End diff --

Great! That makes things a lot easier :-)


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183532#comment-16183532
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141507401
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 
== that.f0
+  case _ => false
+}
+}
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E](new MapView[E, Integer](
+  getValueTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO))
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.contains(value)) {
+accumulator.f0.put(value, accumulator.f0.get(value) + 1)
+  } else {
+accumulator.f0.put(value, 1)
+  }
+}
+  }
+
+  override def getValue(accumulator: CollectAccumulator[E]): util.Map[E, 
Integer] = {
+val iterator = accumulator.f0.iterator
+if (iterator.hasNext) {
+  val map = new util.HashMap[E, Integer]()
+  while (iterator.hasNext) {
+val entry = iterator.next()
+map.put(entry.getKey, entry.getValue)
+  }
+  map
+} else {
+  null.asInstanceOf[util.Map[E, Integer]]
--- End diff --

Check with Calcite tests, should return an empty Multiset instead.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183530#comment-16183530
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141507177
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
--- End diff --

I took a look at Calcite tests for Collect function, null will be ignored.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183531#comment-16183531
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141507197
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
--- End diff --

done


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183444#comment-16183444
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141494139
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
--- End diff --

normal scala class still need to, but case class dont need to.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183445#comment-16183445
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141494189
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
--- End diff --

done


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183381#comment-16183381
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141485981
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1414,8 +1414,29 @@ object AggregateUtil {
   aggregates(index) = udagg.getFunction
   accTypes(index) = udagg.accType
 
-case unSupported: SqlAggFunction =>
-  throw new TableException(s"unsupported Function: 
'${unSupported.getName}'")
+case other: SqlAggFunction =>
+  if (other.getKind == SqlKind.COLLECT) {
+aggregates(index) = sqlTypeName match {
+  case TINYINT =>
+new ByteCollectAggFunction
+  case SMALLINT =>
+new ShortCollectAggFunction
+  case INTEGER =>
+new IntCollectAggFunction
+  case BIGINT =>
+new LongCollectAggFunction
+  case VARCHAR | CHAR =>
+new StringCollectAggFunction
+  case FLOAT =>
+new FloatCollectAggFunction
+  case DOUBLE =>
+new DoubleCollectAggFunction
+  case _ =>
+new ObjectCollectAggFunction
+}
+  } else {
--- End diff --

done


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183380#comment-16183380
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141485942
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1414,8 +1414,29 @@ object AggregateUtil {
   aggregates(index) = udagg.getFunction
   accTypes(index) = udagg.accType
 
-case unSupported: SqlAggFunction =>
-  throw new TableException(s"unsupported Function: 
'${unSupported.getName}'")
+case other: SqlAggFunction =>
--- End diff --

done


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16183148#comment-16183148
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141449807
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 
== that.f0
+  case _ => false
+}
+}
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E](new MapView[E, Integer](
+  getValueTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO))
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.contains(value)) {
--- End diff --

Good catch. done.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182615#comment-16182615
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141352765
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
+   return false;
+   }
+
+   @Override
+   public int getArity() {
+   return 0;
+   }
+
+   @Override
+   public int getTotalFields() {
--- End diff --

Yes, `MapTypeInfo` should also return `1`. The method should return the 
number of nested fields as they are exposed to Flink's API. A Map as well as a 
MultiSet are handled by Flink's APIs as an atomic data type (you cannot 
reference a specific map entry as a key).


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182125#comment-16182125
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141260762
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
--- End diff --

done


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182123#comment-16182123
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141260601
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
+   return false;
+   }
+
+   @Override
+   public int getArity() {
+   return 0;
+   }
+
+   @Override
+   public int getTotalFields() {
--- End diff --

@fhueske so MapTypeInfo should also return 1? I am a bit confused here, 
what does it mean for "the number of logical fields in this type" for MapType?


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182098#comment-16182098
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141256073
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
+   return false;
+   }
+
+   @Override
+   public int getArity() {
+   return 0;
+   }
+
+   @Override
+   public int getTotalFields() {
+   // similar as arrays, the multiset are "opaque" to the direct 
field addressing logic
+   // since the multiset's elements are not addressable, we do not 
expose them
+   return 1;
+   }
+
+   @SuppressWarnings("unchecked")
+   @Override
+   public Class> getTypeClass() {
+   return (Class>)(Class)Map.class;
+   }
+
+   @Override
+   public boolean isKeyType() {
--- End diff --

done


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182097#comment-16182097
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141256026
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
+   return false;
+   }
+
+   @Override
+   public int getArity() {
+   return 0;
+   }
+
+   @Override
+   public int getTotalFields() {
+   // similar as arrays, the multiset are "opaque" to the direct 
field addressing logic
+   // since the multiset's elements are not addressable, we do not 
expose them
+   return 1;
+   }
+
+   @SuppressWarnings("unchecked")
+   @Override
+   public Class> getTypeClass() {
--- End diff --

done


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182096#comment-16182096
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141255931
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
+   return false;
+   }
+
+   @Override
+   public int getArity() {
--- End diff --

done


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182093#comment-16182093
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141255917
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
--- End diff --

done


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181117#comment-16181117
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141105482
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 
== that.f0
+  case _ => false
+}
+}
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E](new MapView[E, Integer](
+  getValueTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO))
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
--- End diff --

If yes, we would need to check if `MapView` supports `null` keys. If not we 
could wrap the key in a Row of arity 1 because Row supports null serialization.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181113#comment-16181113
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141103652
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
--- End diff --

Does SQL Multiset also support `null` values? If yes, we would need to wrap 
the `MapSerializer`.
Otherwise, the problem would be that we would need to rely on the key 
serializer to support `null` which many serializers do not. An solution would 
be to wrap the `MapSerializer` and additionally serialize the count for `null` 
elements.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181110#comment-16181110
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141112340
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1414,8 +1414,29 @@ object AggregateUtil {
   aggregates(index) = udagg.getFunction
   accTypes(index) = udagg.accType
 
-case unSupported: SqlAggFunction =>
-  throw new TableException(s"unsupported Function: 
'${unSupported.getName}'")
+case other: SqlAggFunction =>
+  if (other.getKind == SqlKind.COLLECT) {
+aggregates(index) = sqlTypeName match {
+  case TINYINT =>
+new ByteCollectAggFunction
+  case SMALLINT =>
+new ShortCollectAggFunction
+  case INTEGER =>
+new IntCollectAggFunction
+  case BIGINT =>
+new LongCollectAggFunction
+  case VARCHAR | CHAR =>
+new StringCollectAggFunction
+  case FLOAT =>
+new FloatCollectAggFunction
+  case DOUBLE =>
+new DoubleCollectAggFunction
+  case _ =>
+new ObjectCollectAggFunction
+}
+  } else {
--- End diff --

else case can be removed because we keep the catch all.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181114#comment-16181114
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r14367
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1414,8 +1414,29 @@ object AggregateUtil {
   aggregates(index) = udagg.getFunction
   accTypes(index) = udagg.accType
 
-case unSupported: SqlAggFunction =>
--- End diff --

Since we add a dedicated case for `COLLECT`, this case should not be remain 
at the end of this `match`.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181116#comment-16181116
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r14022
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1414,8 +1414,29 @@ object AggregateUtil {
   aggregates(index) = udagg.getFunction
   accTypes(index) = udagg.accType
 
-case unSupported: SqlAggFunction =>
-  throw new TableException(s"unsupported Function: 
'${unSupported.getName}'")
+case other: SqlAggFunction =>
--- End diff --

Change this case to `case collect: SqlAggFunction if collect.getKind == 
SqlKind.COLLECT =>` to have a dedicated case for this built-in function. Also 
the case after `case _: SqlCountAggFunction` to have all built-in functions 
together.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181101#comment-16181101
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141096269
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
--- End diff --

implemented by `MapTypeInfo`, no need to override.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181112#comment-16181112
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141113194
  
--- Diff: docs/dev/table/sql.md ---
@@ -2107,6 +2108,17 @@ VAR_SAMP(value)
 Returns the sample variance (square of the sample standard 
deviation) of the numeric field across all input values.
   
 
+
+
+  
+  {% highlight text %}
+  COLLECT(value)
+  {% endhighlight %}
+  
+  
+  Returns a multiset of the values.
--- End diff --

Be more specific about the handling of `null` values. Are they ignored? 
What is returned if only null values are added (`null` or empty multiset)?


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181105#comment-16181105
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141097303
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
+   return false;
+   }
+
+   @Override
+   public int getArity() {
+   return 0;
+   }
+
+   @Override
+   public int getTotalFields() {
+   // similar as arrays, the multiset are "opaque" to the direct 
field addressing logic
+   // since the multiset's elements are not addressable, we do not 
expose them
+   return 1;
+   }
+
+   @SuppressWarnings("unchecked")
+   @Override
+   public Class> getTypeClass() {
+   return (Class>)(Class)Map.class;
+   }
+
+   @Override
+   public boolean isKeyType() {
--- End diff --

implemented by `MapTypeInfo`, no need to override.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181115#comment-16181115
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141106084
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 
== that.f0
+  case _ => false
+}
+}
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E](new MapView[E, Integer](
+  getValueTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO))
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.contains(value)) {
+accumulator.f0.put(value, accumulator.f0.get(value) + 1)
+  } else {
+accumulator.f0.put(value, 1)
+  }
+}
+  }
+
+  override def getValue(accumulator: CollectAccumulator[E]): util.Map[E, 
Integer] = {
+val iterator = accumulator.f0.iterator
+if (iterator.hasNext) {
+  val map = new util.HashMap[E, Integer]()
+  while (iterator.hasNext) {
+val entry = iterator.next()
+map.put(entry.getKey, entry.getValue)
+  }
+  map
+} else {
+  null.asInstanceOf[util.Map[E, Integer]]
--- End diff --

According to the specs of `COLLECT`, is null the correct return value or an 
empty Multiset?


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181109#comment-16181109
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141102418
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 
== that.f0
+  case _ => false
+}
+}
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E](new MapView[E, Integer](
+  getValueTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO))
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
--- End diff --

I'm not familiar with the specs of the `Collect` function. Should it also 
support `null` values?


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181103#comment-16181103
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141104254
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
+that match {
+  case that: CollectAccumulator[E] => that.canEqual(this) && this.f0 
== that.f0
+  case _ => false
+}
+}
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E](new MapView[E, Integer](
+  getValueTypeInfo.asInstanceOf[TypeInformation[E]], 
BasicTypeInfo.INT_TYPE_INFO))
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.contains(value)) {
--- End diff --

`contains` and `get` issue result in two map look-ups.
It is more efficient to just do `get` and check for `null`


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181108#comment-16181108
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141100624
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
+  def this() {
+this(null)
+  }
+
+  def canEqual(a: Any) = a.isInstanceOf[CollectAccumulator[E]]
+
+  override def equals(that: Any): Boolean =
--- End diff --

Usually Pojos don't need to implement `equals`


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181100#comment-16181100
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141093589
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
--- End diff --

rm newline


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181098#comment-16181098
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141096297
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
--- End diff --

implemented by `MapTypeInfo`, no need to override.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181107#comment-16181107
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141096324
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
+   return false;
+   }
+
+   @Override
+   public int getArity() {
--- End diff --

implemented by `MapTypeInfo`, no need to override.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1618#comment-1618
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141095159
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
--- End diff --

Add to `org.apache.flink.table.api.Types` class for easy creation of 
`TypeInformation`


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181102#comment-16181102
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141099803
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.table.api.dataview.MapView
+import org.apache.flink.table.dataview.MapViewTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E](var f0:MapView[E, Integer]) {
--- End diff --

add space `var f0: MapView`


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181099#comment-16181099
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141093104
  
--- Diff: docs/dev/table/sql.md ---
@@ -746,6 +746,7 @@ The SQL runtime is built on top of Flink's DataSet and 
DataStream APIs. Internal
 | `Types.PRIMITIVE_ARRAY`| `ARRAY` | e.g. `int[]`  
 |
 | `Types.OBJECT_ARRAY`   | `ARRAY` | e.g. 
`java.lang.Byte[]`|
 | `Types.MAP`| `MAP`   | 
`java.util.HashMap`|
+| `Types.MULTISET`   | `MULTISET`  | 
`java.util.HashMap`|
--- End diff --

should we explain how the `HashMap` is used to represent the multiset, 
i.e., that a multiset of `String` is a `HashMap`?


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181104#comment-16181104
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141097202
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
+   return false;
+   }
+
+   @Override
+   public int getArity() {
+   return 0;
+   }
+
+   @Override
+   public int getTotalFields() {
--- End diff --

The implementation of `getTotalFields()` of `MapTypeInfo` (which returns 
`2`) is not correct.
Can you move our implementation to `MapTypeInfo`? Then we don't need to 
override it here.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16181106#comment-16181106
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r141097263
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/MultisetTypeInfo.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link TypeInformation} for the Multiset types of the Java API.
+ *
+ * @param  The type of the elements in the Multiset.
+ */
+@PublicEvolving
+public final class MultisetTypeInfo extends MapTypeInfo {
+
+   private static final long serialVersionUID = 1L;
+
+
+   public MultisetTypeInfo(Class elementTypeClass) {
+   super(elementTypeClass, Integer.class);
+   }
+
+   public MultisetTypeInfo(TypeInformation elementTypeInfo) {
+   super(elementTypeInfo, BasicTypeInfo.INT_TYPE_INFO);
+   }
+
+   // 

+   //  MultisetTypeInfo specific properties
+   // 

+
+   /**
+* Gets the type information for the elements contained in the Multiset
+*/
+   public TypeInformation getElementTypeInfo() {
+   return getKeyTypeInfo();
+   }
+
+   // 

+   //  TypeInformation implementation
+   // 

+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
+   return false;
+   }
+
+   @Override
+   public int getArity() {
+   return 0;
+   }
+
+   @Override
+   public int getTotalFields() {
+   // similar as arrays, the multiset are "opaque" to the direct 
field addressing logic
+   // since the multiset's elements are not addressable, we do not 
expose them
+   return 1;
+   }
+
+   @SuppressWarnings("unchecked")
+   @Override
+   public Class> getTypeClass() {
--- End diff --

implemented by `MapTypeInfo`, no need to override.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16174428#comment-16174428
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140176626
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.functions.AggregateFunction
+
+import scala.collection.JavaConverters._
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  @transient
+  private lazy val addFunction = new BiFunction[Integer, Integer, Integer] 
{
+override def apply(t: Integer, u: Integer): Integer = t + u
+  }
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E]()
+acc.f0 = new util.HashMap[E, Integer]()
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.containsKey(value)) {
+val add = (x: Integer, y: Integer) => x + y
+accumulator.f0.merge(value, 1, addFunction)
+  } else {
+accumulator.f0.put(value, 1)
+  }
+}
+  }
+
+  override def getValue(accumulator: CollectAccumulator[E]): util.Map[E, 
Integer] = {
+if (accumulator.f0.size() > 0) {
+  new util.HashMap(accumulator.f0)
+} else {
+  null.asInstanceOf[util.Map[E, Integer]]
+}
+  }
+
+  def resetAccumulator(acc: CollectAccumulator[E]): Unit = {
+acc.f0.clear()
+  }
+
+  override def getAccumulatorType: TypeInformation[CollectAccumulator[E]] 
= {
+new TupleTypeInfo(
+  classOf[CollectAccumulator[E]],
+  new GenericTypeInfo[util.Map[E, Integer]](classOf[util.Map[E, 
Integer]]))
--- End diff --

@fhueske Thanks. I think that 's what exactly the current code is. Please 
take another look.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16174388#comment-16174388
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140167679
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.functions.AggregateFunction
+
+import scala.collection.JavaConverters._
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  @transient
+  private lazy val addFunction = new BiFunction[Integer, Integer, Integer] 
{
+override def apply(t: Integer, u: Integer): Integer = t + u
+  }
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E]()
+acc.f0 = new util.HashMap[E, Integer]()
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.containsKey(value)) {
+val add = (x: Integer, y: Integer) => x + y
+accumulator.f0.merge(value, 1, addFunction)
+  } else {
+accumulator.f0.put(value, 1)
+  }
+}
+  }
+
+  override def getValue(accumulator: CollectAccumulator[E]): util.Map[E, 
Integer] = {
+if (accumulator.f0.size() > 0) {
+  new util.HashMap(accumulator.f0)
+} else {
+  null.asInstanceOf[util.Map[E, Integer]]
+}
+  }
+
+  def resetAccumulator(acc: CollectAccumulator[E]): Unit = {
+acc.f0.clear()
+  }
+
+  override def getAccumulatorType: TypeInformation[CollectAccumulator[E]] 
= {
+new TupleTypeInfo(
+  classOf[CollectAccumulator[E]],
+  new GenericTypeInfo[util.Map[E, Integer]](classOf[util.Map[E, 
Integer]]))
--- End diff --

We could have an abstract method `getElementTypeInfo()` that returns the 
type info for the elements. The basic types can be properly handled and for 
`Object` we fall back to `GenericType`.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16174341#comment-16174341
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140159387
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.functions.AggregateFunction
+
+import scala.collection.JavaConverters._
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  @transient
+  private lazy val addFunction = new BiFunction[Integer, Integer, Integer] 
{
+override def apply(t: Integer, u: Integer): Integer = t + u
+  }
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E]()
+acc.f0 = new util.HashMap[E, Integer]()
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.containsKey(value)) {
+val add = (x: Integer, y: Integer) => x + y
+accumulator.f0.merge(value, 1, addFunction)
+  } else {
+accumulator.f0.put(value, 1)
+  }
+}
+  }
+
+  override def getValue(accumulator: CollectAccumulator[E]): util.Map[E, 
Integer] = {
+if (accumulator.f0.size() > 0) {
+  new util.HashMap(accumulator.f0)
+} else {
+  null.asInstanceOf[util.Map[E, Integer]]
+}
+  }
+
+  def resetAccumulator(acc: CollectAccumulator[E]): Unit = {
+acc.f0.clear()
+  }
+
+  override def getAccumulatorType: TypeInformation[CollectAccumulator[E]] 
= {
+new TupleTypeInfo(
+  classOf[CollectAccumulator[E]],
+  new GenericTypeInfo[util.Map[E, Integer]](classOf[util.Map[E, 
Integer]]))
--- End diff --

Changed to use MapViewTypeInfo here. However, if E is not basic type, I can 
only use GenericTypeInfo(please see ObjectCollectAggFunction), is there a 
better way? @fhueske 


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16174340#comment-16174340
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140159053
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.functions.AggregateFunction
+
+import scala.collection.JavaConverters._
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
--- End diff --

Please take another look, I've updated to use MapView.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173814#comment-16173814
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140089533
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MultisetRelDataType.scala
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.`type`.MultisetSqlType
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+class MultisetRelDataType(
+val typeInfo: TypeInformation[_],
+elementType: RelDataType,
+isNullable: Boolean)
+  extends MultisetSqlType(
+elementType,
+isNullable) {
+
+  override def toString = s"MULTISET($typeInfo)"
--- End diff --

Done


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173810#comment-16173810
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140089018
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
 ---
@@ -329,6 +329,35 @@ class AggregateITCase(
   }
 
   @Test
+  def testTumbleWindowAggregateWithCollect(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val sqlQuery =
+  "SELECT b, COLLECT(b)" +
--- End diff --

Updated the documentation.

Table API ticket created: 
https://issues.apache.org/jira/browse/FLINK-7658?filter=-1


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173777#comment-16173777
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140083074
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.functions.AggregateFunction
+
+import scala.collection.JavaConverters._
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  @transient
+  private lazy val addFunction = new BiFunction[Integer, Integer, Integer] 
{
+override def apply(t: Integer, u: Integer): Integer = t + u
+  }
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E]()
+acc.f0 = new util.HashMap[E, Integer]()
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.containsKey(value)) {
+val add = (x: Integer, y: Integer) => x + y
--- End diff --

yes, removed.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173486#comment-16173486
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140009815
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.functions.AggregateFunction
+
+import scala.collection.JavaConverters._
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
--- End diff --

We can use a `MapView` here. This feature was recently added and 
automatically backs the Map with a MapState if possible. Otherwise, it uses a 
Java HashMap (as right now). The benefit of backing the accumulator by MapState 
is that only the keys and values that are accessed need to be deserialized. In 
contrast, a regular HashMap is completely de/serialized every time the 
accumulator is read. Using MapView would require that the accumulator is 
implemented as a POJO (instead of a Tuple1). 

Check this class for details 
[MapView](https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala)
 and let me know if you have questions.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173484#comment-16173484
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140022994
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.functions.AggregateFunction
+
+import scala.collection.JavaConverters._
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  @transient
+  private lazy val addFunction = new BiFunction[Integer, Integer, Integer] 
{
+override def apply(t: Integer, u: Integer): Integer = t + u
+  }
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E]()
+acc.f0 = new util.HashMap[E, Integer]()
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.containsKey(value)) {
+val add = (x: Integer, y: Integer) => x + y
--- End diff --

`add` is not used, right?


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173485#comment-16173485
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140026944
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
 ---
@@ -329,6 +329,35 @@ class AggregateITCase(
   }
 
   @Test
+  def testTumbleWindowAggregateWithCollect(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val sqlQuery =
+  "SELECT b, COLLECT(b)" +
--- End diff --

Collect should be added to the [SQL 
documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sql.html#built-in-functions)
 under "Built-in Function" -> "Aggregate Functions"

Moreover, we should add `MULTISET` to the [supported data 
types](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sql.html#data-types).

It would also be nice if you could open a JIRA to add support for COLLECT 
to the Table API. We try to keep both in sync and it helps if we have a list of 
things that need to be added.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173483#comment-16173483
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140025368
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MultisetRelDataType.scala
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.`type`.MultisetSqlType
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+class MultisetRelDataType(
+val typeInfo: TypeInformation[_],
+elementType: RelDataType,
+isNullable: Boolean)
+  extends MultisetSqlType(
+elementType,
+isNullable) {
+
+  override def toString = s"MULTISET($typeInfo)"
--- End diff --

should be rather `s"MULTISET($elementType)"`. `TypeInformation` is a Flink 
concept whereas RelDataType is in the Calcite context.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173487#comment-16173487
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r140024187
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CollectAggFunction.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.aggfunctions
+
+import java.lang.{Iterable => JIterable}
+import java.util
+import java.util.function.BiFunction
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple1 => JTuple1}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo}
+import org.apache.flink.table.functions.AggregateFunction
+
+import scala.collection.JavaConverters._
+
+/** The initial accumulator for Collect aggregate function */
+class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]]
+
+abstract class CollectAggFunction[E]
+  extends AggregateFunction[util.Map[E, Integer], CollectAccumulator[E]] {
+
+  @transient
+  private lazy val addFunction = new BiFunction[Integer, Integer, Integer] 
{
+override def apply(t: Integer, u: Integer): Integer = t + u
+  }
+
+  override def createAccumulator(): CollectAccumulator[E] = {
+val acc = new CollectAccumulator[E]()
+acc.f0 = new util.HashMap[E, Integer]()
+acc
+  }
+
+  def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = {
+if (value != null) {
+  if (accumulator.f0.containsKey(value)) {
+val add = (x: Integer, y: Integer) => x + y
+accumulator.f0.merge(value, 1, addFunction)
+  } else {
+accumulator.f0.put(value, 1)
+  }
+}
+  }
+
+  override def getValue(accumulator: CollectAccumulator[E]): util.Map[E, 
Integer] = {
+if (accumulator.f0.size() > 0) {
+  new util.HashMap(accumulator.f0)
+} else {
+  null.asInstanceOf[util.Map[E, Integer]]
+}
+  }
+
+  def resetAccumulator(acc: CollectAccumulator[E]): Unit = {
+acc.f0.clear()
+  }
+
+  override def getAccumulatorType: TypeInformation[CollectAccumulator[E]] 
= {
+new TupleTypeInfo(
+  classOf[CollectAccumulator[E]],
+  new GenericTypeInfo[util.Map[E, Integer]](classOf[util.Map[E, 
Integer]]))
--- End diff --

Don't use a generic type here. This will result in a KryoSerializer which 
can be quite inefficient and result in state that cannot be upgraded. Rather 
use `MapTypeInformation`.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168434#comment-16168434
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r139237307
  
--- Diff: flink-core/pom.xml ---
@@ -80,6 +80,13 @@ under the License.


 
+   
+   
+   org.apache.commons
--- End diff --

Thanks. Use java.util.Map instead.  


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16168430#comment-16168430
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r139237144
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ---
@@ -211,6 +218,14 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
 canonize(relType)
   }
 
+  override def createMultisetType(elementType: RelDataType, 
maxCardinality: Long): RelDataType = {
+val relType = new MultisetRelDataType(
--- End diff --

Added changes in FlinkRelNode & ExpressionReducer


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166289#comment-16166289
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r138889015
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ---
@@ -211,6 +218,14 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
 canonize(relType)
   }
 
+  override def createMultisetType(elementType: RelDataType, 
maxCardinality: Long): RelDataType = {
+val relType = new MultisetRelDataType(
--- End diff --

There are multiple location where a new type has to be added like 
`FlinkRelNode`.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166288#comment-16166288
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/4585#discussion_r13586
  
--- Diff: flink-core/pom.xml ---
@@ -80,6 +80,13 @@ under the License.


 
+   
+   
+   org.apache.commons
--- End diff --

We should not add additional dependencies to Flink just because of a new 
data type. There is also no reason behind choosing this library. Couldn't we 
not just use a usual Java Map? Otherwise I would propose to add class for our 
own type like we did it for `org.apache.flink.types.Row`. Calcite is using 
`List`, which is not very nice, but would also work.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-05 Thread Shuyi Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16154802#comment-16154802
 ] 

Shuyi Chen commented on FLINK-7491:
---

Thanks a lot, [~fhueske].

> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16152795#comment-16152795
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4585
  
Thanks @suez1224, I'm quite busy atm but will try to have a look soon.
Thanks, Fabian


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16151151#comment-16151151
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/4585
  
Hi @fhueske , I've filled out the PR template. Please take a look. Thanks a 
lot.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-09-01 Thread Shuyi Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16150155#comment-16150155
 ] 

Shuyi Chen commented on FLINK-7491:
---

Hi [~fhue...@gmail.com], I've updated the PR description. Could you please help 
take a look at the PR?  Thanks.

> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146843#comment-16146843
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4585
  
Hi @suez1224, please read and fill out the template in the PR description. 
Thank you.


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-08-28 Thread Shuyi Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16144282#comment-16144282
 ] 

Shuyi Chen commented on FLINK-7491:
---

Hi [~jark], thanks for the response. However, I am worried with Array as the 
runtime type, multiset specific operation will be slow, for example.

MEMBER OF operator is O(1) for multiset data structure and O(n) for array.
SUBMULTISET OF operator is O(m+n) for array, and O(m) for multiset if to test M 
< N.

Also the actual type I am using is HashMultiset, which is backed by a java 
HashMap, which I think should perform reasonably well.

> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-08-27 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16143104#comment-16143104
 ] 

Jark Wu commented on FLINK-7491:


Hi [~suez1224] , what I mean use {{Array}} instead of {{AbstractMultiSet}} is 
the runtime type, not sql type. For sql type, of course we should use the 
{{MultisetSqlType}}. But for the runtime type, you are using a guava 
{{AbstractMultiSet}} (collect elements into the AbstractMultiSet) which I think 
is poor performance, and I think maybe a Java array is enough for this. Calcite 
MultisetSqlType doesn't force us to use which Java type in runtime. 

> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-08-25 Thread Shuyi Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16142277#comment-16142277
 ] 

Shuyi Chen commented on FLINK-7491:
---

Thanks for reviewing the PR. [~jark]

I think Multiset and Array are different, and they support different set of 
operators (please see 
http://farrago.sourceforge.net/design/CollectionTypes.html). Also, the calcite 
definition of the COLLECT SqlAggFunction explicitly requires the return type to 
be a Multiset (see below)
{code:java}
  /**
   * The COLLECT operator. Multiset aggregator function.
   */
  public static final SqlAggFunction COLLECT =
  new SqlAggFunction("COLLECT",
  null,
  SqlKind.COLLECT,
  ReturnTypes.TO_MULTISET,
  null,
  OperandTypes.ANY,
  SqlFunctionCategory.SYSTEM, false, false) {
  };
{code}

I am worried that, if we use an Array to emulate a Multiset, going down the 
path, we might have performance problem for large multiset, and potentially 
calcite integration issues that are related to multiset. What do you think?
 

> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141348#comment-16141348
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4585
  
Hi @suez1224  thanks for the PR, I think we can use `Array` instead of 
`AbstractMultiSet`. `AbstractMultiSet` is too obscure for users. In that case, 
we do not need the MultiSetSerilizer and MultiSetTypeInfo,  also the following 
queries can use UDF on the field with array type as the `evel(...)` parameters. 


> Support COLLECT Aggregate function in Flink SQL
> ---
>
> Key: FLINK-7491
> URL: https://issues.apache.org/jira/browse/FLINK-7491
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7491) Support COLLECT Aggregate function in Flink SQL

2017-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141276#comment-16141276
 ] 

ASF GitHub Bot commented on FLINK-7491:
---

GitHub user suez1224 opened a pull request:

https://github.com/apache/flink/pull/4585

[FLINK-7491] add MultiSetTypeInfo; add built-in Collect Aggregate Function 
for Flink SQL.

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You