[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146022#comment-16146022
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4355


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16144957#comment-16144957
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4355
  
Thanks for the update @kaibozhou.
I think this PR is good to merge. 
I'll do some final small changes and will merge this.

Thanks, Fabian


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16144771#comment-16144771
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on the issue:

https://github.com/apache/flink/pull/4355
  
All comments addressed.
Do you have time to merge it  @wuchong @fhueske  ?

Thanks.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16144769#comment-16144769
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135697894
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * if it is backed by a state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](
+@transient private[flink] val elementTypeInfo: TypeInformation[T])
+  extends DataView {
+
+  def this() = this(null)
+
+  private[flink] var list: util.List[T] = new util.ArrayList[T]()
--- End diff --

Also the MapView should apply these modification.

Very good suggesttion.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1610#comment-1610
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r13411
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * if it is backed by a state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](
+@transient private[flink] val elementTypeInfo: TypeInformation[T])
+  extends DataView {
+
+  def this() = this(null)
+
+  private[flink] var list: util.List[T] = new util.ArrayList[T]()
--- End diff --

We can refactor the `ListView` constructors as follows:

```
class ListView[T] private[flink](
@transient private[flink] val elementTypeInfo: TypeInformation[T],
private[flink] val list: util.List[T])
  extends DataView {

  def this(elementTypeInfo: TypeInformation[T]) {
this(elementTypeInfo, new util.ArrayList[T]())
  }

  def this() = {
this(null, new util.ArrayList[T]())
  }

  ...
}
```

and call the primary constructor in the `ListSerializer` with `null` for 
the type information.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1613#comment-1613
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135550350
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * if it is backed by a state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](
+@transient private[flink] val elementTypeInfo: TypeInformation[T])
+  extends DataView {
+
+  def this() = this(null)
+
+  private[flink] var list: util.List[T] = new util.ArrayList[T]()
--- End diff --

right now an empty `ArrayList` is always created when a `ListView` is 
instantiated. 
This is unnecessary overhead when the `ListView` is copied or deserialized 
using `ListViewSerializer` because the empty instance is immediately replaced.

We should add an option to create a `ListView` without an `ArrayList` 
instance. This means we have to move the creation of the `ArrayList` out of the 
primary constructor.

The same applies to the `MapView`.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1618#comment-1618
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135524948
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -42,6 +46,18 @@ class AggregationCodeGenerator(
 input: TypeInformation[_ <: Any])
   extends CodeGenerator(config, nullableInput, input) {
 
+  // set of statements for cleanup dataview that will be added only once
+  // we use a LinkedHashSet to keep the insertion order
+  private val reusableCleanupStatements = mutable.LinkedHashSet[String]()
+
+  /**
+* @return code block of statements that need to be placed in the 
cleanup() method of
--- End diff --

`RichFunction` does not have a `cleanup()` method. The `cleanup()` method 
is a method of `GeneratedAggregations`.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16144439#comment-16144439
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135528903
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,119 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Create DataView Term, for example, acc1_map_dataview.
+  *
+  * @param aggIndex index of aggregate function
+  * @param fieldName field name of DataView
+  * @return term to access [[MapView]] or [[ListView]]
+  */
+def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
+  s"acc${aggIndex}_${fieldName}_dataview"
+}
+
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig.isDefined) {
+val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.get(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find DataView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
--- End diff --

move `serialize` method to this class and rename to 
`serializeStateDescriptor`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1614#comment-1614
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135648347
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,92 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* Remove StateView fields from accumulator type information.
+*
+* @param index index of aggregate function
+* @param aggFun aggregate function
+* @param accType accumulator type information, only support pojo type
+* @param isStateBackedDataViews is data views use state backend
+* @return mapping of accumulator type information and data view config 
which contains id,
+* field name and state descriptor
+*/
+  def removeStateViewFieldsFromAccTypeInfo(
+index: Int,
+aggFun: AggregateFunction[_, _],
+accType: TypeInformation[_],
+isStateBackedDataViews: Boolean)
+  : (TypeInformation[_], Option[Seq[DataViewSpec[_]]]) = {
+
+var hasDataView = false
+val acc = aggFun.createAccumulator()
+accType match {
+  case pojoType: PojoTypeInfo[_] if pojoType.getArity > 0 =>
+val arity = pojoType.getArity
+val newPojoFields = new util.ArrayList[PojoField]()
+val accumulatorSpecs = new mutable.ArrayBuffer[DataViewSpec[_]]
+for (i <- 0 until arity) {
+  val pojoField = pojoType.getPojoFieldAt(i)
+  val field = pojoField.getField
+  val fieldName = field.getName
+  field.setAccessible(true)
+
+  pojoField.getTypeInformation match {
+case map: MapViewTypeInfo[Any, Any] =>
+  val mapView = field.get(acc).asInstanceOf[MapView[_, _]]
+  if (mapView != null) {
+val keyTypeInfo = mapView.keyTypeInfo
+val valueTypeInfo = mapView.valueTypeInfo
+val newTypeInfo = if (keyTypeInfo != null && valueTypeInfo 
!= null) {
+  hasDataView = true
+  new MapViewTypeInfo(keyTypeInfo, valueTypeInfo)
+} else {
+  map
+}
+
+var spec = MapViewSpec(
+  "agg" + index + "$" + fieldName, // generate unique name 
to be used as state name
+  field,
+  newTypeInfo)
+
+accumulatorSpecs += spec
+if (!isStateBackedDataViews) { // add data view field 
which not use state backend
+  newPojoFields.add(new PojoField(field, newTypeInfo))
+}
+  }
+
+case list: ListViewTypeInfo[Any] =>
+  val listView = field.get(acc).asInstanceOf[ListView[_]]
+  if (listView != null) {
+val elementTypeInfo = listView.elementTypeInfo
+val newTypeInfo = if (elementTypeInfo != null) {
+  hasDataView = true
+  new ListViewTypeInfo(elementTypeInfo)
+} else {
+  list
+}
+
+var spec = ListViewSpec(
+  "agg" + index + "$" + fieldName, // generate unique name 
to be used as state name
+  field,
+  newTypeInfo)
+
+accumulatorSpecs += spec
+if (!isStateBackedDataViews) { // add data view field 
which not use state backend
+  newPojoFields.add(new PojoField(field, newTypeInfo))
+}
+  }
+
+case _ => newPojoFields.add(pojoField)
+  }
+}
+(new PojoTypeInfo(accType.getTypeClass, newPojoFields), 
Some(accumulatorSpecs))
+
+  case _ if !hasDataView => (accType, None)
+  case _ => throw new TableException("MapView and ListView only 
support in PoJo class")
--- End diff --

This case will never be reached. `hasDataView` is only set to `true` in the 
`case pojoType: PojoTypeInfo[_]` case. Hence, it will always be false when we 
come to this point.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue 

[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1615#comment-1615
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135534660
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,119 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Create DataView Term, for example, acc1_map_dataview.
+  *
+  * @param aggIndex index of aggregate function
+  * @param fieldName field name of DataView
+  * @return term to access [[MapView]] or [[ListView]]
+  */
+def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
+  s"acc${aggIndex}_${fieldName}_dataview"
+}
+
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
--- End diff --

Add parentheses to method. Only methods without side-effects should have no 
parentheses. 


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1617#comment-1617
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135541895
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the list,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+class ListViewSerializer[T](val listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = {
+new ListView[T]
+  }
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val listview = new ListView[T]
+listview.list = from.list
--- End diff --

We should create a copy of `from.list` using the `ListSerializer`. 
Otherwise we share the instance.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1619#comment-1619
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135539685
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -249,7 +258,8 @@ object AggregateUtil {
   outputArity,
   needRetract,
   needMerge = false,
-  needReset = true
+  needReset = true,
--- End diff --

`needReset` can be `false`.
`resetAccumulator()` is not called by the any of the window operators. Not 
sure why this was `true` before...


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1611#comment-1611
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135528733
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,119 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Create DataView Term, for example, acc1_map_dataview.
+  *
+  * @param aggIndex index of aggregate function
+  * @param fieldName field name of DataView
+  * @return term to access [[MapView]] or [[ListView]]
+  */
+def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
+  s"acc${aggIndex}_${fieldName}_dataview"
+}
+
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig.isDefined) {
+val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.get(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find DataView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
+val dataViewFieldTerm = createDataViewTerm(i, 
dataViewField.getName)
+val field =
+  s"""
+ |transient $dataViewTypeTerm $dataViewFieldTerm = 
null;
+  """.stripMargin
+reusableMemberStatements.add(field)
+
+// create DataViews
+val descFieldTerm = s"${dataViewFieldTerm}_desc"
+val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+val descDeserializeCode =
+  s"""
+ |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+ |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
--- End diff --

implement deserialization directly in generated code. Moreover, we should 
use the user code classloader for the deserialization which is accessible via 
the `RuntimeContext`.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1612#comment-1612
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135561193
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -237,9 +248,13 @@ public void resetAccumulator(CountDistinctAccum acc) {
//Overloaded retract method
public void retract(CountDistinctAccum accumulator, long id) {
try {
-   if 
(!accumulator.map.contains(String.valueOf(id))) {
-   
accumulator.map.remove(String.valueOf(id));
-   accumulator.count -= 1;
+   Integer cnt = 
accumulator.map.get(String.valueOf(id));
+   if (cnt != null) {
+   cnt -= 1;
+   if (cnt <= 0) {
+   
accumulator.map.remove(String.valueOf(id));
+   accumulator.count -= 1;
+   }
--- End diff --

We should update the count if it is > 0:

```
else { 
  accumulator.map.put(String.valueOf(id), cnt)
}
```


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1616#comment-1616
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135587110
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1398,14 +1412,29 @@ object AggregateUtil {
   }
 }
 
+val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
 // create accumulator type information for every aggregate function
 aggregates.zipWithIndex.foreach { case (agg, index) =>
-  if (null == accTypes(index)) {
+  if (accTypes(index) != null) {
+val (accType, specs) = removeStateViewFieldsFromAccTypeInfo(index,
+  agg,
+  accTypes(index),
+  isStateBackedDataViews)
+if (specs.isDefined) {
+  accSpecs(index) = specs.get
+  accTypes(index) = accType
+} else {
+  accSpecs(index) = Seq()
+  accTypes(index) = getAccumulatorTypeOfAggregateFunction(agg)
--- End diff --

No need to override `accTypes(index)`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16143722#comment-16143722
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4355
  
Hi @wuchong, 

I think we don't need to call `open()` and `close()` in 
`AggregateAggFunction`. `GeneratedAggregations` is an internal class which is 
not exposed to users. It would be a bug in the translation logic if a 
`GeneratedAggregations` which requires `open()` or `close()` would be passed to 
a `AggregateAggFunction`. A user couldn't do anything to prevent this.

+1 for refactoring `AggregateCodeGenerator`.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16143682#comment-16143682
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135504282
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.reflect.Field
+
+import org.apache.flink.api.common.state.{ListStateDescriptor, 
MapStateDescriptor, StateDescriptor}
+import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo}
+
+/**
+  * Data view specification.
+  *
+  * @tparam ACC type extends [[DataView]]
+  */
+trait DataViewSpec[ACC <: DataView] {
+  def id: String
+  def field: Field
+  def toStateDescriptor: StateDescriptor[_, _]
--- End diff --

Very good point, you are right! 
We need to generate the state descriptors here, serialize them and ship 
them.

Thanks for the clarification.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16143681#comment-16143681
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135503957
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -100,6 +108,17 @@ abstract class GeneratedAggregations extends Function {
 * aggregated results
 */
   def resetAccumulator(accumulators: Row)
+
+  /**
+* Cleanup for the accumulators.
+*/
+  def cleanup()
+
+  /**
+* Tear-down method for 
[[org.apache.flink.table.functions.AggregateFunction]].
+* It can be used for clean up work. By default, this method does 
nothing.
+*/
+  def close()
--- End diff --

Sorry, I overlooked the `close()` calls. If the method is used, we should 
keep it of course.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141406#comment-16141406
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on the issue:

https://github.com/apache/flink/pull/4355
  
Thanks @fhueske for the suggestion. 
All comments addressed.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141208#comment-16141208
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135184770
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
+
+   @Override
+   public CountDistinctAccum createAccumulator() {
+   CountDistinctAccum accum = new CountDistinctAccum();
+   accum.map = new MapView<>(Types.STRING, Types.INT);
+   accum.count = 0L;
+   return accum;
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, String 
id) {
+   try {
+   if (!accumulator.map.contains(id)) {
+   accumulator.map.put(id, 1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, long id) 
{
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   accumulator.map.put(String.valueOf(id), 
1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   @Override
+   public Long getValue(CountDistinctAccum accumulator) {
+   return accumulator.count;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge.
+*/
+   public static class CountDistinctWithMerge extends CountDistinct {
+
+   //Overloaded merge method
+   public void merge(CountDistinctAccum acc, 
Iterable it) {
+   Iterator iter = it.iterator();
+   while (iter.hasNext()) {
+   CountDistinctAccum mergeAcc = iter.next();
+   acc.count += mergeAcc.count;
+
+   try {
+   Iterator mapItr = 
mergeAcc.map.keys().iterator();
+   while (mapItr.hasNext()) {
+   String key = mapItr.next();
+   if (!acc.map.contains(key)) {
+   acc.map.put(key, 1);
+   }
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge and reset.
+*/
+   public static class CountDistinctWithMergeAndReset extends 
CountDistinctWithMerge {
+
+   //Overloaded retract method
+   public void resetAccumulator(CountDistinctAccum acc) {
+   acc.map.clear();
+   acc.count = 0;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with retract.
+*/
+   public static class CountDistinctWithRetractAndReset extends 
CountDistinct {
+
+   //Overloaded retract method
+   public void retract(CountDistinctAccum accumulator, long id) {
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   
accumulator.map.remove(String.valueOf(id));
--- End diff --

The code here is the opposite. It should be:
```
if 

[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141143#comment-16141143
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135181433
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -100,6 +108,17 @@ abstract class GeneratedAggregations extends Function {
 * aggregated results
 */
   def resetAccumulator(accumulators: Row)
+
+  /**
+* Cleanup for the accumulators.
+*/
+  def cleanup()
+
+  /**
+* Tear-down method for 
[[org.apache.flink.table.functions.AggregateFunction]].
+* It can be used for clean up work. By default, this method does 
nothing.
+*/
+  def close()
--- End diff --

The close() method is corresponding to open method. And it will be called 
when open was called before, e.g. ProcTimeUnboundedOver's close() method. 
Otherwise, the udagg's close will not be called. 

I modify the case testGroupAggregateWithStateBackend to test the close was 
called.

what do you think?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141140#comment-16141140
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/4355
  
The new created method `open(ctx)` and `close()` of `GeneratedAggregations` 
 are not called by `AggregateAggFunction` which is used in window aggregate.  I 
suggest to call the `open(ctx)` method but pass a `RuntimeContext` which throw 
exceptions in every method to tell users `User Defined AggregateFunction is not 
supported to call open() and close() in window`. But this can be addressed in 
another issue. 

BTW, I think the `AggregateCodeGenerator#generateAggregations` is too long 
with 500+ LOC. I would like to refactor it if you have no objection @kaibozhou 
@fhueske .  I have created 
[FLINK-7509](https://issues.apache.org/jira/browse/FLINK-7509) .


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141123#comment-16141123
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135179322
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.reflect.Field
+
+import org.apache.flink.api.common.state.{ListStateDescriptor, 
MapStateDescriptor, StateDescriptor}
+import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo}
+
+/**
+  * Data view specification.
+  *
+  * @tparam ACC type extends [[DataView]]
+  */
+trait DataViewSpec[ACC <: DataView] {
+  def id: String
+  def field: Field
+  def toStateDescriptor: StateDescriptor[_, _]
--- End diff --

Maybe this method also needed. State descriptors cannot code generated as 
the TypeInformation was passed by users.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141121#comment-16141121
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135179101
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -178,13 +294,15 @@ class AggregationCodeGenerator(
|  ${aggMapping(i)},
|  (${accTypes(i)}) accs.getField($i));""".stripMargin
   } else {
+val setDataView = genDataViewFieldSetter(s"acc$i", i)
 j"""
|org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
|  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
-   |
+   |${accTypes(i)} acc$i = (${accTypes(i)}) 
accs.getField($i);
+   |$setDataView
--- End diff --

yes


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141120#comment-16141120
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135179087
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
--- End diff --

state descriptors need TypeInformation which passed by user,  so we can not 
code gen state descriptor. 


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141085#comment-16141085
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135173258
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
+val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
+Base64.encodeBase64URLSafeString(byteArray)
+  }
+
+  @throws[Exception]
+  def deserialize(data: String): StateDescriptor[_, _] = {
+val byteData = Base64.decodeBase64(data)
+InstantiationUtil.deserializeObject[StateDescriptor[_, _]](
+  byteData,
+  Thread.currentThread.getContextClassLoader)
+  }
+
+  def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
+s"acc${aggIndex}_${fieldName}_dataview"
+  }
 }
+
+case class DataViewConfig(accSpecs: Array[Seq[DataViewSpec[_]]], 
isStateBackedDataViews: Boolean)
--- End diff --

yes, this class not need now.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141078#comment-16141078
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135172148
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -759,10 +786,12 @@ object AggregateUtil {
 : GroupCombineFunction[Row, Row] = {
 
 val needRetract = false
-val (aggFieldIndexes, aggregates, accTypes) = 
transformToAggregateFunctions(
+val isStateBackedDataViews = false
+val (aggFieldIndexes, aggregates, accTypes, accSpecs) = 
transformToAggregateFunctions(
--- End diff --

it make sense


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141075#comment-16141075
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135171980
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -356,6 +486,9 @@ class AggregationCodeGenerator(
""".stripMargin
 
   if (needMerge) {
+if (accConfig != null && accConfig.isStateBackedDataViews) {
+  throw new CodeGenException("DataView doesn't support merge when 
the backend uses state.")
--- End diff --

I found the exception will be throw in everywhere call 
generateAggregations(..).
I think we can print funcName in exception info.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140995#comment-16140995
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135163096
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
+
+   @Override
+   public CountDistinctAccum createAccumulator() {
+   CountDistinctAccum accum = new CountDistinctAccum();
+   accum.map = new MapView<>(Types.STRING, Types.INT);
+   accum.count = 0L;
+   return accum;
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, String 
id) {
+   try {
+   if (!accumulator.map.contains(id)) {
+   accumulator.map.put(id, 1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, long id) 
{
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   accumulator.map.put(String.valueOf(id), 
1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   @Override
+   public Long getValue(CountDistinctAccum accumulator) {
+   return accumulator.count;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge.
+*/
+   public static class CountDistinctWithMerge extends CountDistinct {
+
+   //Overloaded merge method
+   public void merge(CountDistinctAccum acc, 
Iterable it) {
+   Iterator iter = it.iterator();
+   while (iter.hasNext()) {
+   CountDistinctAccum mergeAcc = iter.next();
+   acc.count += mergeAcc.count;
+
+   try {
+   Iterator mapItr = 
mergeAcc.map.keys().iterator();
+   while (mapItr.hasNext()) {
+   String key = mapItr.next();
+   if (!acc.map.contains(key)) {
+   acc.map.put(key, 1);
+   }
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge and reset.
+*/
+   public static class CountDistinctWithMergeAndReset extends 
CountDistinctWithMerge {
+
+   //Overloaded retract method
+   public void resetAccumulator(CountDistinctAccum acc) {
+   acc.map.clear();
+   acc.count = 0;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with retract.
+*/
+   public static class CountDistinctWithRetractAndReset extends 
CountDistinct {
+
+   //Overloaded retract method
+   public void retract(CountDistinctAccum accumulator, long id) {
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   
accumulator.map.remove(String.valueOf(id));
+   accumulator.count -= 1;
+   }
+   } 

[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140991#comment-16140991
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135162070
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
+val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
+Base64.encodeBase64URLSafeString(byteArray)
+  }
+
+  @throws[Exception]
+  def deserialize(data: String): StateDescriptor[_, _] = {
+val byteData = Base64.decodeBase64(data)
+InstantiationUtil.deserializeObject[StateDescriptor[_, _]](
+  byteData,
+  Thread.currentThread.getContextClassLoader)
+  }
+
+  def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
--- End diff --

yes, just the omission of code refactoring


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140796#comment-16140796
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135126762
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -100,6 +108,17 @@ abstract class GeneratedAggregations extends Function {
 * aggregated results
 */
   def resetAccumulator(accumulators: Row)
+
+  /**
+* Cleanup for the accumulators.
+*/
+  def cleanup()
+
+  /**
+* Tear-down method for 
[[org.apache.flink.table.functions.AggregateFunction]].
+* It can be used for clean up work. By default, this method does 
nothing.
+*/
+  def close()
--- End diff --

I think `close()` is never called. So we can remove it, right?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140788#comment-16140788
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135121779
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
+val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
+Base64.encodeBase64URLSafeString(byteArray)
+  }
+
+  @throws[Exception]
+  def deserialize(data: String): StateDescriptor[_, _] = {
+val byteData = Base64.decodeBase64(data)
+InstantiationUtil.deserializeObject[StateDescriptor[_, _]](
+  byteData,
+  Thread.currentThread.getContextClassLoader)
+  }
+
+  def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
+s"acc${aggIndex}_${fieldName}_dataview"
+  }
 }
+
+case class DataViewConfig(accSpecs: Array[Seq[DataViewSpec[_]]], 
isStateBackedDataViews: Boolean)
--- End diff --

We only need a `DataViewConfig` if the views are backed by state right? So 
we can remove the `boolean` `isStateBackedDataViews` field and make it an 
optional parameter of the code generator.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140784#comment-16140784
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135107609
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *public void accumulate(MyAccum accumulator, String id) {
+  *  try {
+  *  if (!accumulator.map.contains(id)) {
+  *accumulator.map.put(id, 1);
+  *accumulator.count++;
+  *  }
+  *  } catch (Exception e) {
+  *e.printStackTrace();
+  *  }
+  *}
+  *
+  *@Override
+  *public Long getValue(MyAccum accumulator) {
+  *  return accumulator.count;
+  *}
+  *  }
+  *
+  * }}}
+  *
+  * @param keyTypeInfo key type information
+  * @param valueTypeInfo value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
+class MapView[K, V](
+@transient private[flink] val keyTypeInfo: TypeInformation[K],
+@transient private[flink] val valueTypeInfo: TypeInformation[V])
+  extends DataView {
+
+  def this() = this(null, null)
+
+  private[flink] var map = new util.HashMap[K, V]()
+
+  /**
+* Returns the value to which the specified key is mapped, or { @code 
null } if this map
+* contains no mapping for the key.
+*
+* @param key The key of the mapping.
+* @return The value of the mapping with the given key.
+* @throws Exception Thrown if the system cannot get data.
+*/
+  @throws[Exception]
+  def get(key: K): V = map.get(key)
+
+  /**
+* Put a value with the given key into the map.
+*
+* @param key   The key of the mapping.
+* @param value The new value of the mapping.
+* @throws Exception Thrown if the system cannot put data.
+*/
+  @throws[Exception]
+  def put(key: K, value: V): Unit = map.put(key, value)
+
+  /**
+* Copies all of the mappings from the specified map to this map view.
+*
+* @param map The mappings to be stored in this map.
+* @throws Exception Thrown if the system cannot access the map.
+*/
+  @throws[Exception]
+  def putAll(map: util.Map[K, V]): Unit = this.map.putAll(map)
+
+  /**
+* Deletes the mapping of the given key.
+*
+* @param key The key of the mapping.
  

[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140799#comment-16140799
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135128744
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
+
+   @Override
+   public CountDistinctAccum createAccumulator() {
+   CountDistinctAccum accum = new CountDistinctAccum();
+   accum.map = new MapView<>(Types.STRING, Types.INT);
+   accum.count = 0L;
+   return accum;
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, String 
id) {
+   try {
+   if (!accumulator.map.contains(id)) {
+   accumulator.map.put(id, 1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, long id) 
{
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   accumulator.map.put(String.valueOf(id), 
1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   @Override
+   public Long getValue(CountDistinctAccum accumulator) {
+   return accumulator.count;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge.
+*/
+   public static class CountDistinctWithMerge extends CountDistinct {
+
+   //Overloaded merge method
+   public void merge(CountDistinctAccum acc, 
Iterable it) {
+   Iterator iter = it.iterator();
+   while (iter.hasNext()) {
+   CountDistinctAccum mergeAcc = iter.next();
+   acc.count += mergeAcc.count;
+
+   try {
+   Iterator mapItr = 
mergeAcc.map.keys().iterator();
+   while (mapItr.hasNext()) {
+   String key = mapItr.next();
+   if (!acc.map.contains(key)) {
+   acc.map.put(key, 1);
+   }
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge and reset.
+*/
+   public static class CountDistinctWithMergeAndReset extends 
CountDistinctWithMerge {
+
+   //Overloaded retract method
+   public void resetAccumulator(CountDistinctAccum acc) {
+   acc.map.clear();
+   acc.count = 0;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with retract.
+*/
+   public static class CountDistinctWithRetractAndReset extends 
CountDistinct {
+
+   //Overloaded retract method
+   public void retract(CountDistinctAccum accumulator, long id) {
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   
accumulator.map.remove(String.valueOf(id));
+   accumulator.count -= 1;
+   }
+   } 

[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140780#comment-16140780
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135107137
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
--- End diff --

`when use state backend` -> `if it is backed by a state backend.`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140797#comment-16140797
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135131075
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -79,13 +95,15 @@ class AggregationCodeGenerator(
 outputArity: Int,
 needRetract: Boolean,
 needMerge: Boolean,
-needReset: Boolean)
+needReset: Boolean,
+accConfig: DataViewConfig)
--- End diff --

make this an `Option[Array[Seq[DataViewSpec[_` with `None` default 
value?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140803#comment-16140803
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135135051
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
+val dataViewFieldTerm = AggregateUtil.createDataViewTerm(i, 
dataViewField.getName)
+val field =
+  s"""
+ |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+
+// create DataViews
+val descFieldTerm = s"${dataViewFieldTerm}_desc"
+val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+val descDeserialize =
+  s"""
+ |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+ |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+ |  .deserialize("$serializedData");
+ """.stripMargin
+val createDataView = if (dataViewField.getType == 
classOf[MapView[_, _]]) {
+  s"""
+ |$descDeserialize
+ |$dataViewFieldTerm = new 
org.apache.flink.table.dataview.StateMapView(
+ |  $contextTerm.getMapState((
+ |
org.apache.flink.api.common.state.MapStateDescriptor)$descFieldTerm));
+   """.stripMargin
+} else if (dataViewField.getType == classOf[ListView[_]]) {
+  s"""
+ |$descDeserialize
+ |$dataViewFieldTerm = new 
org.apache.flink.table.dataview.StateListView(
+ |  $contextTerm.getListState((
+ |
org.apache.flink.api.common.state.ListStateDescriptor)$descFieldTerm));
+   """.stripMargin
--- End diff --

indent


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140808#comment-16140808
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135139164
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -356,6 +486,9 @@ class AggregationCodeGenerator(
""".stripMargin
 
   if (needMerge) {
+if (accConfig != null && accConfig.isStateBackedDataViews) {
+  throw new CodeGenException("DataView doesn't support merge when 
the backend uses state.")
--- End diff --

Can we throw this exception earlier (e.g., in `AggregateUtil`) and give 
more details about the aggregation function?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140812#comment-16140812
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135125391
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
--- End diff --

do we need this method? Instantiation of state descriptors can be code 
generated.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140793#comment-16140793
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112227
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
--- End diff --

Please remove the annotation.
we don't use the `@Internal`, `@Public` and `@PublicEvolving` annotations 
in the `flink-table` module.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140805#comment-16140805
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112266
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfo.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.ListSerializer
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * [[ListView]] type information.
+  *
+  * @param elementType element type information
+  * @tparam T element type
+  */
+@PublicEvolving
--- End diff --

Please remove the annotation.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140809#comment-16140809
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112662
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import org.apache.flink.api.common.typeutils.base.{MapSerializer, 
MapSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * A serializer for [[MapView]]. The serializer relies on a key 
serializer and a value
+  * serializer for the serialization of the map's key-value pairs.
+  *
+  * The serialization format for the map is as follows: four bytes for 
the length of the map,
+  * followed by the serialized representation of each key-value pair. To 
allow null values,
+  * each value is prefixed by a null marker.
+  *
+  * @param mapSerializer Map serializer.
+  * @tparam K The type of the keys in the map.
+  * @tparam V The type of the values in the map.
+  */
+@Internal
+class MapViewSerializer[K, V](val mapSerializer: MapSerializer[K, V])
+  extends TypeSerializer[MapView[K, V]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[MapView[K, V]] =
+new MapViewSerializer[K, V](
+  mapSerializer.duplicate().asInstanceOf[MapSerializer[K, V]])
+
+  override def createInstance(): MapView[K, V] = {
+val mapview = new MapView[K, V]
+mapview.putAll(mapSerializer.createInstance())
+mapview
+  }
+
+  override def copy(from: MapView[K, V]): MapView[K, V] = {
+val mapview = new MapView[K, V]
+mapview.putAll(mapSerializer.copy(from.map))
--- End diff --

Same as for `ListStateSerializer`. Copying one list into another is more 
expensive than just replacing the list instance.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140804#comment-16140804
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135134994
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
+val dataViewFieldTerm = AggregateUtil.createDataViewTerm(i, 
dataViewField.getName)
+val field =
+  s"""
+ |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+
+// create DataViews
+val descFieldTerm = s"${dataViewFieldTerm}_desc"
+val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+val descDeserialize =
+  s"""
+ |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+ |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+ |  .deserialize("$serializedData");
+ """.stripMargin
--- End diff --

indent


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140790#comment-16140790
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112753
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.MapSerializer
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * [[MapView]] type information.
+  *
+  * @param keyType key type information
+  * @param valueType value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@PublicEvolving
--- End diff --

Please remove the annotation


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140792#comment-16140792
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135124134
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -759,10 +786,12 @@ object AggregateUtil {
 : GroupCombineFunction[Row, Row] = {
 
 val needRetract = false
-val (aggFieldIndexes, aggregates, accTypes) = 
transformToAggregateFunctions(
+val isStateBackedDataViews = false
+val (aggFieldIndexes, aggregates, accTypes, accSpecs) = 
transformToAggregateFunctions(
--- End diff --

`transformToAggregateFunctions` has a default value for 
`isStateBackedDataViews`. So we don't need to pass a parameter. Moreover, we 
only need to pass data view information to the code generator if the data views 
are backed by state (if we make it an optional parameter). So most of the 
changes here can be reverted (and also for all other operators that do not 
support state backed views).


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140791#comment-16140791
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135110707
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](val listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.createInstance())
+listview
+  }
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.copy(from.list))
+listview
+  }
+
+  override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = 
copy(from)
+
+  override def getLength: Int = -1
+
+  override def serialize(record: ListView[T], target: DataOutputView): 
Unit = {
+listSerializer.serialize(record.list, target)
+  }
+
+  override def deserialize(source: DataInputView): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.deserialize(source))
--- End diff --

same as before. We could directly set the list.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140781#comment-16140781
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135108408
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1458,9 +1459,10 @@ abstract class CodeGenerator(
 * Adds a reusable [[UserDefinedFunction]] to the member area of the 
generated [[Function]].
 *
 * @param function [[UserDefinedFunction]] object to be instantiated 
during runtime
+* @param contextTerm [[RuntimeContext]] term
--- End diff --

`term to access the [[RuntimeContext]]`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140779#comment-16140779
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135109902
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](val listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.createInstance())
--- End diff --

I think this call is not necessary. The new list instance is empty, so 
nothing is added.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140795#comment-16140795
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135128323
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
+
+   @Override
+   public CountDistinctAccum createAccumulator() {
+   CountDistinctAccum accum = new CountDistinctAccum();
+   accum.map = new MapView<>(Types.STRING, Types.INT);
+   accum.count = 0L;
+   return accum;
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, String 
id) {
+   try {
+   if (!accumulator.map.contains(id)) {
+   accumulator.map.put(id, 1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, long id) 
{
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   accumulator.map.put(String.valueOf(id), 
1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   @Override
+   public Long getValue(CountDistinctAccum accumulator) {
+   return accumulator.count;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge.
+*/
+   public static class CountDistinctWithMerge extends CountDistinct {
+
+   //Overloaded merge method
+   public void merge(CountDistinctAccum acc, 
Iterable it) {
+   Iterator iter = it.iterator();
+   while (iter.hasNext()) {
+   CountDistinctAccum mergeAcc = iter.next();
+   acc.count += mergeAcc.count;
+
+   try {
+   Iterator mapItr = 
mergeAcc.map.keys().iterator();
+   while (mapItr.hasNext()) {
+   String key = mapItr.next();
+   if (!acc.map.contains(key)) {
+   acc.map.put(key, 1);
+   }
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge and reset.
+*/
+   public static class CountDistinctWithMergeAndReset extends 
CountDistinctWithMerge {
+
+   //Overloaded retract method
+   public void resetAccumulator(CountDistinctAccum acc) {
+   acc.map.clear();
+   acc.count = 0;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with retract.
+*/
+   public static class CountDistinctWithRetractAndReset extends 
CountDistinct {
+
+   //Overloaded retract method
+   public void retract(CountDistinctAccum accumulator, long id) {
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   
accumulator.map.remove(String.valueOf(id));
--- End diff --

shouldn't a count distinct with retraction increment the counter value in 
the MapView in 

[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140801#comment-16140801
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135125473
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
+val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
+Base64.encodeBase64URLSafeString(byteArray)
+  }
+
+  @throws[Exception]
+  def deserialize(data: String): StateDescriptor[_, _] = {
+val byteData = Base64.decodeBase64(data)
+InstantiationUtil.deserializeObject[StateDescriptor[_, _]](
+  byteData,
+  Thread.currentThread.getContextClassLoader)
+  }
+
+  def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
--- End diff --

Move this method to `AggregationCodeGenerator`?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140807#comment-16140807
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135125423
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
+val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
+Base64.encodeBase64URLSafeString(byteArray)
+  }
+
+  @throws[Exception]
+  def deserialize(data: String): StateDescriptor[_, _] = {
--- End diff --

do we need this method? Instantiation of state descriptors can be code 
generated.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140786#comment-16140786
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135106244
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * when use state backend..
--- End diff --

`when use state backend..` -> `if it is backed by a state backend.`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140783#comment-16140783
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112047
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import org.apache.flink.api.common.typeutils.base.{MapSerializer, 
MapSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * A serializer for [[MapView]]. The serializer relies on a key 
serializer and a value
+  * serializer for the serialization of the map's key-value pairs.
+  *
+  * The serialization format for the map is as follows: four bytes for 
the length of the map,
+  * followed by the serialized representation of each key-value pair. To 
allow null values,
+  * each value is prefixed by a null marker.
+  *
+  * @param mapSerializer Map serializer.
+  * @tparam K The type of the keys in the map.
+  * @tparam V The type of the values in the map.
+  */
+@Internal
--- End diff --

Please remove the annotation


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140782#comment-16140782
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135110511
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](val listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.createInstance())
+listview
+  }
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.copy(from.list))
--- End diff --

`addAll()` adds overhead because all elements are copied from on list to 
the other. Can't the serializer have direct access to the list field of the 
`ListView` and replace set the list copy as the `ListView`'s list?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140798#comment-16140798
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135135021
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
+val dataViewFieldTerm = AggregateUtil.createDataViewTerm(i, 
dataViewField.getName)
+val field =
+  s"""
+ |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+
+// create DataViews
+val descFieldTerm = s"${dataViewFieldTerm}_desc"
+val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+val descDeserialize =
+  s"""
+ |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+ |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+ |  .deserialize("$serializedData");
+ """.stripMargin
+val createDataView = if (dataViewField.getType == 
classOf[MapView[_, _]]) {
+  s"""
+ |$descDeserialize
+ |$dataViewFieldTerm = new 
org.apache.flink.table.dataview.StateMapView(
+ |  $contextTerm.getMapState((
+ |
org.apache.flink.api.common.state.MapStateDescriptor)$descFieldTerm));
+   """.stripMargin
--- End diff --

indent


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140802#comment-16140802
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135124865
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1398,14 +1440,29 @@ object AggregateUtil {
   }
 }
 
+val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
 // create accumulator type information for every aggregate function
 aggregates.zipWithIndex.foreach { case (agg, index) =>
-  if (null == accTypes(index)) {
+  if (accTypes(index) != null) {
+val (accType, specs) = removeStateViewFieldsFromAccTypeInfo(index,
+  agg,
+  accTypes(index),
+  isStateBackedDataViews)
+if (specs.isDefined) {
+  accSpecs(index) = specs.get
+  accTypes(index) = accType
+} else {
+  accTypes(index) = getAccumulatorTypeOfAggregateFunction(agg)
--- End diff --

use same order as above:
```
accSpecs(index) = ...
accTypes(index) = 
```


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140811#comment-16140811
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135133385
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
--- End diff --

The code is not `ListView` specific. Change to `DataView`?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140806#comment-16140806
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135140565
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.reflect.Field
+
+import org.apache.flink.api.common.state.{ListStateDescriptor, 
MapStateDescriptor, StateDescriptor}
+import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo}
+
+/**
+  * Data view specification.
+  *
+  * @tparam ACC type extends [[DataView]]
+  */
+trait DataViewSpec[ACC <: DataView] {
+  def id: String
+  def field: Field
+  def toStateDescriptor: StateDescriptor[_, _]
--- End diff --

Do we need this method? State descriptors can be initatiated by generated 
code.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140810#comment-16140810
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135138596
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -178,13 +294,15 @@ class AggregationCodeGenerator(
|  ${aggMapping(i)},
|  (${accTypes(i)}) accs.getField($i));""".stripMargin
   } else {
+val setDataView = genDataViewFieldSetter(s"acc$i", i)
 j"""
|org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
|  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
-   |
+   |${accTypes(i)} acc$i = (${accTypes(i)}) 
accs.getField($i);
+   |$setDataView
--- End diff --

directly call `genDataViewFieldSetter(s"acc$i", i)` here? 
Would make it easier to follow the `acc$i` variables.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140789#comment-16140789
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135109036
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
--- End diff --

lost -> list, remove the `` tag


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140787#comment-16140787
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112454
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import org.apache.flink.api.common.typeutils.base.{MapSerializer, 
MapSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * A serializer for [[MapView]]. The serializer relies on a key 
serializer and a value
+  * serializer for the serialization of the map's key-value pairs.
+  *
+  * The serialization format for the map is as follows: four bytes for 
the length of the map,
+  * followed by the serialized representation of each key-value pair. To 
allow null values,
+  * each value is prefixed by a null marker.
+  *
+  * @param mapSerializer Map serializer.
+  * @tparam K The type of the keys in the map.
+  * @tparam V The type of the values in the map.
+  */
+@Internal
+class MapViewSerializer[K, V](val mapSerializer: MapSerializer[K, V])
+  extends TypeSerializer[MapView[K, V]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[MapView[K, V]] =
+new MapViewSerializer[K, V](
+  mapSerializer.duplicate().asInstanceOf[MapSerializer[K, V]])
+
+  override def createInstance(): MapView[K, V] = {
+val mapview = new MapView[K, V]
+mapview.putAll(mapSerializer.createInstance())
--- End diff --

No need to add anything to the new instance. The map should be empty.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140785#comment-16140785
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135107708
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * when use state backend..
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](
+@transient private[flink] val elementTypeInfo: TypeInformation[T])
+  extends DataView {
+
+  def this() = this(null)
+
+  private[flink] val list = new util.ArrayList[T]()
+
+  /**
+* Returns an iterable of the list.
+*
+* @throws Exception Thrown if the system cannot get data.
+* @return The iterable of the list or { @code null} if the list is 
empty.
+*/
+  @throws[Exception]
+  def get: JIterable[T] = {
+if (!list.isEmpty) {
+  list
+} else {
+  null
+}
+  }
+
+  /**
+* Adding the given value to the list.
+*
+* @throws Exception Thrown if the system cannot add data.
+* @param value element to be appended to this list
+*/
+  @throws[Exception]
+  def add(value: T): Unit = list.add(value)
+
+  /**
+* Copies all of the elements from the specified list to this list view.
+*
+* @throws Exception Thrown if the system cannot add all data.
+* @param list The list to be stored in this list view.
+*/
+  @throws[Exception]
+  def addAll(list: util.List[T]): Unit = this.list.addAll(list)
+
+  /**
+* Removes all of the elements from this list.
+*
+* The list will be empty after this call returns.
+*/
+  override def clear(): Unit = list.clear()
+
+  override def equals(other: Any): Boolean = other match {
+case that: ListView[_] =>
--- End diff --

`case that: ListView[T] =>`?


> Implementation of DataView to support state access for UDAGG
> 

[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140800#comment-16140800
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135117112
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,139 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* Analyze the constructor to get the type information of the MapView 
or ListView type variables
+* inside the accumulate.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
+for (i <- 0 until fields.size()) {
+  val field = fields.get(i)
+  field.setAccessible(true)
+  if (classOf[DataView].isAssignableFrom(field.getType)) {
+if (field.getType == classOf[MapView[_, _]]) {
+  val mapView = field.get(acc).asInstanceOf[MapView[_, _]]
+  if (mapView != null) {
+val keyTypeInfo = mapView.keyTypeInfo
+val valueTypeInfo = mapView.valueTypeInfo
+
+if (keyTypeInfo != null && valueTypeInfo != null) {
+  resultMap.put(field.getName, new 
MapViewTypeInfo(keyTypeInfo, valueTypeInfo))
+}
+  } else {
+resultMap.put(field.getName, null)
+  }
+} else if (field.getType == classOf[ListView[_]]) {
+  val listView = field.get(acc).asInstanceOf[ListView[_]]
+  val elementTypeInfo = listView.elementTypeInfo
+
+  if (elementTypeInfo != null) {
+resultMap.put(field.getName, new 
ListViewTypeInfo(elementTypeInfo))
+  }
+}
+  }
+}
+
+resultMap
+  }
+
+  /**
+* Remove StateView fields from accumulator type information.
+*
+* @param index index of aggregate function
+* @param aggFun aggregate function
+* @param accType accumulator type information, only support pojo type
+* @param isStateBackedDataViews is data views use state backend
+* @return mapping of accumulator type information and data view config 
which contains id,
+* field name and state descriptor
+*/
+  def removeStateViewFieldsFromAccTypeInfo(
+index: Int,
+aggFun: AggregateFunction[_, _],
+accType: TypeInformation[_],
+isStateBackedDataViews: Boolean)
+  : (TypeInformation[_], Option[Seq[DataViewSpec[_]]]) = {
+
+var hasDataView = false
+val acc = aggFun.createAccumulator()
+accType match {
+  case pojoType: PojoTypeInfo[_] if pojoType.getArity > 0 =>
+val arity = pojoType.getArity
+val newPojoFields = new util.ArrayList[PojoField]()
+val accumulatorSpecs = new mutable.ArrayBuffer[DataViewSpec[_]]
+for (i <- 0 until arity) {
+  val pojoField = pojoType.getPojoFieldAt(i)
+  val field = pojoField.getField
+  val fieldName = field.getName
+  field.setAccessible(true)
+
+  pojoField.getTypeInformation match {
+case map: MapViewTypeInfo[Any, Any] =>
+  val mapView = field.get(acc).asInstanceOf[MapView[_, _]]
+  if (mapView != null) {
+val keyTypeInfo = mapView.keyTypeInfo
+val valueTypeInfo = mapView.valueTypeInfo
+val newTypeInfo = if (keyTypeInfo != null && valueTypeInfo 
!= null) {
+  hasDataView = true
+  new MapViewTypeInfo(keyTypeInfo, valueTypeInfo)
+} else {
+  map
+}
+
+var spec = MapViewSpec(
+  "agg" + index + "$" + fieldName, // generate unique name 
to be used as state name
+  field,
+  newTypeInfo)
+
+accumulatorSpecs += spec
+if (!isStateBackedDataViews) { // add data view field 
which not use state backend
+  newPojoFields.add(new PojoField(field, newTypeInfo))
+}
+  }
+

[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140794#comment-16140794
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135134452
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
--- End diff --

Why do we need to serialize and deserialize state descriptors? Can't we 
just generate the code to instantiate them? IMO, that would be more 
straightforward and easier to debug.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16137878#comment-16137878
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on the issue:

https://github.com/apache/flink/pull/4355
  
@fhueske @wuchong 

Thank you for your suggestion, I have update the PR and add test case, 
thanks.
Do you have time to look at this?

Thanks, Kaibo


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16137860#comment-16137860
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134653998
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
+  indices: Range,
+  ctxTerm: String,
+  accConfig: Option[DataViewConfig])
+: String = {
+if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+  val initDataViews = new StringBuilder
+  val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.get.accSpecs
+.flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+.toMap[String, StateDescriptor[_, _]]
+
+  for (i <- indices) yield {
+for (spec <- accConfig.get.accSpecs(i)) yield {
+  val dataViewField = spec.field
+  val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+  val desc = descMapping.getOrElse(spec.id,
+throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+  val serializedData = AggregateUtil.serialize(desc)
+  val dataViewFieldTerm = 
s"acc${i}_${dataViewField.getName}_dataview"
+  val field =
+s"""
+   |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+   |""".stripMargin
+  reusableMemberStatements.add(field)
+
+  val descFieldTerm = s"${dataViewFieldTerm}_desc"
+  val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+  val descDeserialize =
+s"""
+   |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+   |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+   |  .deserialize("$serializedData");
+ """.stripMargin
+
+  val init = if (dataViewField.getType == classOf[MapView[_, _]]) {
+s"""
+   |$descDeserialize
+   |$dataViewFieldTerm =
+   |  
org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm,
--- End diff --

Agree, It can be codegen now.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16137857#comment-16137857
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134653928
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
--- End diff --

Its a good idea, the code will be more clean after refactor.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16137854#comment-16137854
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134653871
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
--- End diff --

Yes, RuntimeContext not need to be member var.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16136604#comment-16136604
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on the issue:

https://github.com/apache/flink/pull/4355
  
In MapViewSerializer.scala, the function "override def serialize(record: 
MapView[K, V], target: DataOutputView):" need to access var map which defined 
in MapView.scala. Becase  mapSerializer.serialize(record.map, target) only 
accept java.util.Map, otherwise, we need to new java.util.Map to call this 
method, this is not very efficient.

The problem is  ”private[flink] var map“ will be seen by Java users, In 
fact, we do not want users to see the implementation of MapView internal.

Maybe use "protected" keyword in Java can solve the problem.




> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134772#comment-16134772
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118408
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * when use state backend..
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
+
+  def this() = this(null)
+
+  val list = new util.ArrayList[T]()
--- End diff --

make the list private, other wise Java users can access it. 


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134776#comment-16134776
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134137525
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
+  indices: Range,
+  ctxTerm: String,
+  accConfig: Option[DataViewConfig])
+: String = {
+if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+  val initDataViews = new StringBuilder
+  val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.get.accSpecs
+.flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+.toMap[String, StateDescriptor[_, _]]
+
+  for (i <- indices) yield {
+for (spec <- accConfig.get.accSpecs(i)) yield {
+  val dataViewField = spec.field
+  val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+  val desc = descMapping.getOrElse(spec.id,
+throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+  val serializedData = AggregateUtil.serialize(desc)
+  val dataViewFieldTerm = 
s"acc${i}_${dataViewField.getName}_dataview"
+  val field =
+s"""
+   |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+   |""".stripMargin
+  reusableMemberStatements.add(field)
+
+  val descFieldTerm = s"${dataViewFieldTerm}_desc"
+  val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+  val descDeserialize =
+s"""
+   |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+   |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+   |  .deserialize("$serializedData");
+ """.stripMargin
+
+  val init = if (dataViewField.getType == classOf[MapView[_, _]]) {
+s"""
+   |$descDeserialize
+   |$dataViewFieldTerm =
+   |  
org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm,
--- End diff --

I think we do not need the `StateViewUtils` here, we can create a MapView 
using code gen directly, because we already have the RuntimeContext and 
StateDescriptor. 


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134773#comment-16134773
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118741
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *public void accumulate(MyAccum accumulator, String id) {
+  *  try {
+  *  if (!accumulator.map.contains(id)) {
+  *accumulator.map.put(id, 1);
+  *accumulator.count++;
+  *  }
+  *  } catch (Exception e) {
+  *e.printStackTrace();
+  *  }
+  *}
+  *
+  *@Override
+  *public Long getValue(MyAccum accumulator) {
+  *  return accumulator.count;
+  *}
+  *  }
+  *
+  * }}}
+  *
+  * @param keyTypeInfo key type information
+  * @param valueTypeInfo value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
+class MapView[K, V](
+   private[flink] val keyTypeInfo: TypeInformation[K],
+   private[flink] val valueTypeInfo: TypeInformation[V])
--- End diff --

I'm not sure whether it is good to add `private[flink]`,  because it is 
`public` for Java users actually.

And please make them `@transient`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134769#comment-16134769
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118851
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
--- End diff --

Please make the elementTypeInfo as `@transient`, and do we want the type 
info to 
 be accessed by users? 


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134783#comment-16134783
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134135181
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -162,13 +172,66 @@ class AggregationCodeGenerator(
   }
 }
 
+def genDataViewFieldSetter(accTerm: String, specs: 
Seq[DataViewSpec[_]]): String = {
+  if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+val setters = for (spec <- specs) yield {
+  val field = spec.field
+  val dataViewTerm = s"${accTerm}_${field.getName}_dataview"
+  val fieldSetter = if (Modifier.isPublic(field.getModifiers)) {
+s"$accTerm.${field.getName} = $dataViewTerm;"
+  } else {
+val fieldTerm = 
addReusablePrivateFieldAccess(field.getDeclaringClass, field.getName)
+s"${reflectiveFieldWriteAccess(fieldTerm, field, accTerm, 
dataViewTerm)};"
+  }
+
+  s"""
+ |$fieldSetter
+""".stripMargin
+}
+setters.mkString("\n")
+  } else {
+""
+  }
+}
+
+def genCleanUpDataView: String = {
+  if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+val cleanUpDataViews = new StringBuilder
+for (i <- aggs.indices) yield {
+  val setters = for (spec <- accConfig.get.accSpecs(i)) yield {
+val dataViewTerm = s"acc${i}_${spec.field.getName}_dataview"
+val cleanUp =
+  s"""
+|$dataViewTerm.clear();
+  """.stripMargin
+cleanUpDataViews.append(cleanUp)
+  }
+}
+
+cleanUpDataViews.toString()
+  } else {
+""
+  }
+}
+
+def genInitialize: String = {
+
+j"""
+   |  public final void initialize(
--- End diff --

I would like to rename the method name to `open(ctx)`. So that we can use 
the `reusableOpenStatements` and `reuseOpenCode()` of `CodeGenerator` to 
generate the content of `open`.  Currently, the `genInitialize` is somewhat 
ambiguous to `reuseInitCode()`. 


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134777#comment-16134777
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118759
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *public void accumulate(MyAccum accumulator, String id) {
+  *  try {
+  *  if (!accumulator.map.contains(id)) {
+  *accumulator.map.put(id, 1);
+  *accumulator.count++;
+  *  }
+  *  } catch (Exception e) {
+  *e.printStackTrace();
+  *  }
+  *}
+  *
+  *@Override
+  *public Long getValue(MyAccum accumulator) {
+  *  return accumulator.count;
+  *}
+  *  }
+  *
+  * }}}
+  *
+  * @param keyTypeInfo key type information
+  * @param valueTypeInfo value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
+class MapView[K, V](
+   private[flink] val keyTypeInfo: TypeInformation[K],
+   private[flink] val valueTypeInfo: TypeInformation[V])
+  extends DataView {
+
+  def this() = this(null, null)
+
+  val map = new util.HashMap[K, V]()
+
+  /**
+* Returns the value to which the specified key is mapped, or { @code 
null } if this map
+* contains no mapping for the key.
+*
+* @param key The key of the mapping.
+* @return The value of the mapping with the given key.
+* @throws Exception Thrown if the system cannot get data.
+*/
+  @throws[Exception]
+  def get(key: K): V = map.get(key)
+
+  /**
+* Put a value with the given key into the map.
+*
+* @param key   The key of the mapping.
+* @param value The new value of the mapping.
+* @throws Exception Thrown if the system cannot put data.
+*/
+  @throws[Exception]
+  def put(key: K, value: V): Unit = map.put(key, value)
+
+  /**
+* Copies all of the mappings from the specified map to this map view.
+*
+* @param map The mappings to be stored in this map.
+* @throws Exception Thrown if the system cannot access the map.
+*/
+  @throws[Exception]
+  def putAll(map: util.Map[K, V]): Unit = map.putAll(map)
+
+  /**
+* Deletes the mapping of the given key.
+*
+* @param key The key of the mapping.
+* @throws Exception Thrown if the 

[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134786#comment-16134786
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134138469
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
--- End diff --

The `addReusableDataViewConfig` should be in `AggregateCodeGenerator`. And 
I would like to change this method to `addReusableDataView(spec: DataViewSpec): 
String`, the returned String is the dataview member variable term. And the 
dataview creation code can be added into `reusableDataViewStatements`. And the 
code of `reusableDataViewStatements` can be added to `initialize(ctx)` (or 
`open(ctx)` as I suggested) at last.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134780#comment-16134780
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134151688
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/StateViewUtils.scala
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.dataview
+
+import java.util
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.api.common.state._
+import org.apache.flink.table.api.dataview.{ListView, MapView}
+
+/**
+  * State view utils to create [[StateListView]] or [[StateMapView]]..
+  */
+object StateViewUtils {
--- End diff --

We may not need this, as we can code generate the creation code.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134781#comment-16134781
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134152736
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
--- End diff --

Yes, I think we can use accumulator type info instead of field information.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134778#comment-16134778
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134137708
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
+  indices: Range,
+  ctxTerm: String,
+  accConfig: Option[DataViewConfig])
+: String = {
+if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+  val initDataViews = new StringBuilder
+  val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.get.accSpecs
+.flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+.toMap[String, StateDescriptor[_, _]]
+
+  for (i <- indices) yield {
+for (spec <- accConfig.get.accSpecs(i)) yield {
+  val dataViewField = spec.field
+  val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+  val desc = descMapping.getOrElse(spec.id,
+throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+  val serializedData = AggregateUtil.serialize(desc)
+  val dataViewFieldTerm = 
s"acc${i}_${dataViewField.getName}_dataview"
+  val field =
+s"""
+   |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+   |""".stripMargin
+  reusableMemberStatements.add(field)
+
+  val descFieldTerm = s"${dataViewFieldTerm}_desc"
+  val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+  val descDeserialize =
+s"""
+   |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+   |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+   |  .deserialize("$serializedData");
+ """.stripMargin
+
+  val init = if (dataViewField.getType == classOf[MapView[_, _]]) {
+s"""
+   |$descDeserialize
+   |$dataViewFieldTerm =
+   |  
org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm,
+   |  $ctxTerm);
+   """.stripMargin
+  } else if (dataViewField.getType == classOf[ListView[_]]) {
+s"""
+   |$descDeserialize
+   |$dataViewFieldTerm =
+   |  
org.apache.flink.table.dataview.StateViewUtils.createListView($descFieldTerm,
--- End diff --

Same as above, we can code gen the creation.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134774#comment-16134774
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134134801
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -100,6 +107,11 @@ abstract class GeneratedAggregations extends Function {
 * aggregated results
 */
   def resetAccumulator(accumulators: Row)
+
+  /**
+* Clean up for the accumulators.
+*/
+  def cleanUp()
--- End diff --

cleanup is also a word, so we do not need a upper case `U` here. 


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134782#comment-16134782
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134139167
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
--- End diff --

Do we need this ?  It only used to add `RuntimeContext` member area, but 
`RuntimeContext` is only used in `initialize`.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134770#comment-16134770
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118744
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *public void accumulate(MyAccum accumulator, String id) {
+  *  try {
+  *  if (!accumulator.map.contains(id)) {
+  *accumulator.map.put(id, 1);
+  *accumulator.count++;
+  *  }
+  *  } catch (Exception e) {
+  *e.printStackTrace();
+  *  }
+  *}
+  *
+  *@Override
+  *public Long getValue(MyAccum accumulator) {
+  *  return accumulator.count;
+  *}
+  *  }
+  *
+  * }}}
+  *
+  * @param keyTypeInfo key type information
+  * @param valueTypeInfo value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
+class MapView[K, V](
+   private[flink] val keyTypeInfo: TypeInformation[K],
+   private[flink] val valueTypeInfo: TypeInformation[V])
+  extends DataView {
+
+  def this() = this(null, null)
+
+  val map = new util.HashMap[K, V]()
--- End diff --

please make the `map` private


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134771#comment-16134771
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134119014
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -162,13 +172,66 @@ class AggregationCodeGenerator(
   }
 }
 
+def genDataViewFieldSetter(accTerm: String, specs: 
Seq[DataViewSpec[_]]): String = {
+  if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+val setters = for (spec <- specs) yield {
+  val field = spec.field
+  val dataViewTerm = s"${accTerm}_${field.getName}_dataview"
--- End diff --

I find that the dataview term is defined in many place, can we create a 
method to generate the term name? such as `createDataViewTerm(index: Int, 
fieldName: String)`


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134779#comment-16134779
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134151559
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * when use state backend..
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
+
+  def this() = this(null)
+
+  val list = new util.ArrayList[T]()
+
+  /**
+* Returns an iterable of the list.
+*
+* @return The iterable of the list or { @code null} if the list is 
empty.
+*/
+  def get: JIterable[T] = {
+if (!list.isEmpty) {
+  list
+} else {
+  null
+}
+  }
+
+  /**
+* Adding the given value to the list.
+*
+* @param value element to be appended to this list
+*/
+  def add(value: T): Unit = list.add(value)
+
+  /**
+* Removes all of the elements from this list.
+*
+* The list will be empty after this call returns.
+*/
+  override def clear(): Unit = list.clear()
+
+  /**
+* Copy from a list instance.
+*
+* @param t List instance.
+* @return A copy of this list instance
+*/
+  def copyFrom(t: util.List[T]): ListView[T] = {
--- End diff --

the `copyFrom` method can be accessed by users. I should avoid this.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134784#comment-16134784
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118959
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -162,13 +172,66 @@ class AggregationCodeGenerator(
   }
 }
 
+def genDataViewFieldSetter(accTerm: String, specs: 
Seq[DataViewSpec[_]]): String = {
+  if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+val setters = for (spec <- specs) yield {
+  val field = spec.field
+  val dataViewTerm = s"${accTerm}_${field.getName}_dataview"
+  val fieldSetter = if (Modifier.isPublic(field.getModifiers)) {
+s"$accTerm.${field.getName} = $dataViewTerm;"
+  } else {
+val fieldTerm = 
addReusablePrivateFieldAccess(field.getDeclaringClass, field.getName)
+s"${reflectiveFieldWriteAccess(fieldTerm, field, accTerm, 
dataViewTerm)};"
+  }
+
+  s"""
+ |$fieldSetter
+""".stripMargin
--- End diff --

indent


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134775#comment-16134775
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118886
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -105,6 +108,13 @@ class AggregationCodeGenerator(
   inFields => for (f <- inFields) yield javaClasses(f)
 }
 
+// define runtimeContext as member variable
+val ctxTerm = s"runtimeContext"
--- End diff --

I think we do not need to make the runtimeContext as a member variable. 


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134785#comment-16134785
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134153349
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -80,7 +82,8 @@ class AggregationCodeGenerator(
 outputArity: Int,
 needRetract: Boolean,
 needMerge: Boolean,
-needReset: Boolean)
+needReset: Boolean,
+accConfig: Option[DataViewConfig])
--- End diff --

It seems that `accConfig` is always `Some(x)`, do we need the `Option`?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134468#comment-16134468
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118240
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -108,30 +112,38 @@ object AggregateUtil {
   outputArity,
   needRetract = false,
   needMerge = false,
-  needReset = false
+  needReset = false,
+  accConfig = Some(DataViewConfig(accSpecs, isUseState))
 )
 
+val accConfig = accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
 if (isRowTimeType) {
   if (isRowsClause) {
 // ROWS unbounded over process function
 new RowTimeUnboundedRowsOver(
   genFunction,
   aggregationStateType,
   CRowTypeInfo(inputTypeInfo),
-  queryConfig)
+  queryConfig,
+  accConfig)
--- End diff --

+1 to do this. A great improvement! 


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134467#comment-16134467
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118203
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
+for (i <- 0 until fields.size()) {
+  val field = fields.get(i)
+  field.setAccessible(true)
+  if (classOf[DataView].isAssignableFrom(field.getType)) {
+if (field.getType == classOf[MapView[_, _]]) {
+  val mapView = field.get(acc)
+  val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, 
"keyTypeInfo")
--- End diff --

Add `private[flink]` to key and value type infos will make them public in 
Java. I'm not sure whether it is a good idea. I would like to not expose them 
to users (Java users), and the reflection only happens in compile which is fine 
I think. 


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132042#comment-16132042
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on the issue:

https://github.com/apache/flink/pull/4355
  
1. Use HeapMapView/HeapListView as default implementation
2. add initialize and cleanUp interface to GeneratedAggregations


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132016#comment-16132016
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r133925097
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -405,6 +481,17 @@ class AggregationCodeGenerator(
   }
 }
 
+val aggFuncCode = Seq(
+  genSetAggregationResults,
+  genAccumulate,
+  genRetract,
+  genCreateAccumulators,
+  genSetForwardedFields,
+  genSetConstantFlags,
+  genCreateOutputRow,
+  genMergeAccumulatorsPair,
+  genResetAccumulator).mkString("\n")
--- End diff --

It make sense,I have look at ProcessFunctionWithCleanupState,the cleanUp 
should be called whenever cleanupState is called.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132017#comment-16132017
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r133925176
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -179,13 +214,19 @@ class AggregationCodeGenerator(
|  ${aggMapping(i)},
|  (${accTypes(i)}) accs.getField($i));""".stripMargin
   } else {
+val setDataView = if (accConfig.isDefined && 
accConfig.get.isUseState) {
--- End diff --

Yes, it make sense.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132010#comment-16132010
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r133924725
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,172 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
--- End diff --

Yes, CountDistinct is just used for test case here.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16109207#comment-16109207
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130655818
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
+
+  def this() = this(null)
+
+  /**
+* Returns an iterable of the list.
+*
+* @return The iterable of the list or { @code null} if the list is 
empty.
+*/
+  def get: JIterable[T] = throw new 
UnsupportedOperationException("Unsupported operation!")
--- End diff --

We try to implement public interfaces in a way that Java users do not need 
to deal with Scala classes. However, it should be fine in this case because 

1. Scala's `UnsupportedOperationException` is defined as Java's 
`UnsupportedOperationException` 
2. the exception will be removed if we implement the method by default as 
backed by an ArrayList.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16109199#comment-16109199
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130655010
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
--- End diff --

Hmm, not sure how many users would actually look into the definition of 
`ListView` and also recognize the interface as a state interface. Such users 
would probably also see the Java/ScalaDocs that explain the purpose of the 
class.

Anyway, I have no strong opinion about this and I'm OK to keep it as it is.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16109185#comment-16109185
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130653790
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
--- End diff --

OK, then let's keep it as it is


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16109025#comment-16109025
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130628208
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
+
+  def this() = this(null)
+
+  /**
+* Returns an iterable of the list.
+*
+* @return The iterable of the list or { @code null} if the list is 
empty.
+*/
+  def get: JIterable[T] = throw new 
UnsupportedOperationException("Unsupported operation!")
+
+  /**
+* Adding the given value to the list.
+*
+* @param value element to be appended to this list
+*/
+  def add(value: T): Unit = throw new 
UnsupportedOperationException("Unsupported operation!")
--- End diff --

I think its a good idea and I will try it.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16109010#comment-16109010
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130625494
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
+for (i <- 0 until fields.size()) {
+  val field = fields.get(i)
+  field.setAccessible(true)
+  if (classOf[DataView].isAssignableFrom(field.getType)) {
+if (field.getType == classOf[MapView[_, _]]) {
+  val mapView = field.get(acc)
+  val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, 
"keyTypeInfo")
--- End diff --

yes,its a good idea.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16108933#comment-16108933
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130612699
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
+
+  def this() = this(null)
+
+  /**
+* Returns an iterable of the list.
+*
+* @return The iterable of the list or { @code null} if the list is 
empty.
+*/
+  def get: JIterable[T] = throw new 
UnsupportedOperationException("Unsupported operation!")
--- End diff --

There is no reference to the java.lang package, so it is a scala 
UnsupportedOperationException, And what's the difference?


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16108923#comment-16108923
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130611417
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
--- End diff --

IMO, this will make ListView more like a state interface. Although the 
interface of ListView is same as ListState, we do not want the user to think 
that it is state. 


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >