[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4355


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135697894
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * if it is backed by a state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](
+@transient private[flink] val elementTypeInfo: TypeInformation[T])
+  extends DataView {
+
+  def this() = this(null)
+
+  private[flink] var list: util.List[T] = new util.ArrayList[T]()
--- End diff --

Also the MapView should apply these modification.

Very good suggesttion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135541895
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the list,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+class ListViewSerializer[T](val listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = {
+new ListView[T]
+  }
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val listview = new ListView[T]
+listview.list = from.list
--- End diff --

We should create a copy of `from.list` using the `ListSerializer`. 
Otherwise we share the instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135534660
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,119 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Create DataView Term, for example, acc1_map_dataview.
+  *
+  * @param aggIndex index of aggregate function
+  * @param fieldName field name of DataView
+  * @return term to access [[MapView]] or [[ListView]]
+  */
+def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
+  s"acc${aggIndex}_${fieldName}_dataview"
+}
+
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
--- End diff --

Add parentheses to method. Only methods without side-effects should have no 
parentheses. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135550350
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * if it is backed by a state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](
+@transient private[flink] val elementTypeInfo: TypeInformation[T])
+  extends DataView {
+
+  def this() = this(null)
+
+  private[flink] var list: util.List[T] = new util.ArrayList[T]()
--- End diff --

right now an empty `ArrayList` is always created when a `ListView` is 
instantiated. 
This is unnecessary overhead when the `ListView` is copied or deserialized 
using `ListViewSerializer` because the empty instance is immediately replaced.

We should add an option to create a `ListView` without an `ArrayList` 
instance. This means we have to move the creation of the `ArrayList` out of the 
primary constructor.

The same applies to the `MapView`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135528903
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,119 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Create DataView Term, for example, acc1_map_dataview.
+  *
+  * @param aggIndex index of aggregate function
+  * @param fieldName field name of DataView
+  * @return term to access [[MapView]] or [[ListView]]
+  */
+def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
+  s"acc${aggIndex}_${fieldName}_dataview"
+}
+
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig.isDefined) {
+val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.get(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find DataView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
--- End diff --

move `serialize` method to this class and rename to 
`serializeStateDescriptor`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r13411
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * if it is backed by a state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](
+@transient private[flink] val elementTypeInfo: TypeInformation[T])
+  extends DataView {
+
+  def this() = this(null)
+
+  private[flink] var list: util.List[T] = new util.ArrayList[T]()
--- End diff --

We can refactor the `ListView` constructors as follows:

```
class ListView[T] private[flink](
@transient private[flink] val elementTypeInfo: TypeInformation[T],
private[flink] val list: util.List[T])
  extends DataView {

  def this(elementTypeInfo: TypeInformation[T]) {
this(elementTypeInfo, new util.ArrayList[T]())
  }

  def this() = {
this(null, new util.ArrayList[T]())
  }

  ...
}
```

and call the primary constructor in the `ListSerializer` with `null` for 
the type information.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135539685
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -249,7 +258,8 @@ object AggregateUtil {
   outputArity,
   needRetract,
   needMerge = false,
-  needReset = true
+  needReset = true,
--- End diff --

`needReset` can be `false`.
`resetAccumulator()` is not called by the any of the window operators. Not 
sure why this was `true` before...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135528733
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,119 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Create DataView Term, for example, acc1_map_dataview.
+  *
+  * @param aggIndex index of aggregate function
+  * @param fieldName field name of DataView
+  * @return term to access [[MapView]] or [[ListView]]
+  */
+def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
+  s"acc${aggIndex}_${fieldName}_dataview"
+}
+
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig.isDefined) {
+val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.get(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find DataView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
+val dataViewFieldTerm = createDataViewTerm(i, 
dataViewField.getName)
+val field =
+  s"""
+ |transient $dataViewTypeTerm $dataViewFieldTerm = 
null;
+  """.stripMargin
+reusableMemberStatements.add(field)
+
+// create DataViews
+val descFieldTerm = s"${dataViewFieldTerm}_desc"
+val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+val descDeserializeCode =
+  s"""
+ |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+ |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
--- End diff --

implement deserialization directly in generated code. Moreover, we should 
use the user code classloader for the deserialization which is accessible via 
the `RuntimeContext`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135648347
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,92 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* Remove StateView fields from accumulator type information.
+*
+* @param index index of aggregate function
+* @param aggFun aggregate function
+* @param accType accumulator type information, only support pojo type
+* @param isStateBackedDataViews is data views use state backend
+* @return mapping of accumulator type information and data view config 
which contains id,
+* field name and state descriptor
+*/
+  def removeStateViewFieldsFromAccTypeInfo(
+index: Int,
+aggFun: AggregateFunction[_, _],
+accType: TypeInformation[_],
+isStateBackedDataViews: Boolean)
+  : (TypeInformation[_], Option[Seq[DataViewSpec[_]]]) = {
+
+var hasDataView = false
+val acc = aggFun.createAccumulator()
+accType match {
+  case pojoType: PojoTypeInfo[_] if pojoType.getArity > 0 =>
+val arity = pojoType.getArity
+val newPojoFields = new util.ArrayList[PojoField]()
+val accumulatorSpecs = new mutable.ArrayBuffer[DataViewSpec[_]]
+for (i <- 0 until arity) {
+  val pojoField = pojoType.getPojoFieldAt(i)
+  val field = pojoField.getField
+  val fieldName = field.getName
+  field.setAccessible(true)
+
+  pojoField.getTypeInformation match {
+case map: MapViewTypeInfo[Any, Any] =>
+  val mapView = field.get(acc).asInstanceOf[MapView[_, _]]
+  if (mapView != null) {
+val keyTypeInfo = mapView.keyTypeInfo
+val valueTypeInfo = mapView.valueTypeInfo
+val newTypeInfo = if (keyTypeInfo != null && valueTypeInfo 
!= null) {
+  hasDataView = true
+  new MapViewTypeInfo(keyTypeInfo, valueTypeInfo)
+} else {
+  map
+}
+
+var spec = MapViewSpec(
+  "agg" + index + "$" + fieldName, // generate unique name 
to be used as state name
+  field,
+  newTypeInfo)
+
+accumulatorSpecs += spec
+if (!isStateBackedDataViews) { // add data view field 
which not use state backend
+  newPojoFields.add(new PojoField(field, newTypeInfo))
+}
+  }
+
+case list: ListViewTypeInfo[Any] =>
+  val listView = field.get(acc).asInstanceOf[ListView[_]]
+  if (listView != null) {
+val elementTypeInfo = listView.elementTypeInfo
+val newTypeInfo = if (elementTypeInfo != null) {
+  hasDataView = true
+  new ListViewTypeInfo(elementTypeInfo)
+} else {
+  list
+}
+
+var spec = ListViewSpec(
+  "agg" + index + "$" + fieldName, // generate unique name 
to be used as state name
+  field,
+  newTypeInfo)
+
+accumulatorSpecs += spec
+if (!isStateBackedDataViews) { // add data view field 
which not use state backend
+  newPojoFields.add(new PojoField(field, newTypeInfo))
+}
+  }
+
+case _ => newPojoFields.add(pojoField)
+  }
+}
+(new PojoTypeInfo(accType.getTypeClass, newPojoFields), 
Some(accumulatorSpecs))
+
+  case _ if !hasDataView => (accType, None)
+  case _ => throw new TableException("MapView and ListView only 
support in PoJo class")
--- End diff --

This case will never be reached. `hasDataView` is only set to `true` in the 
`case pojoType: PojoTypeInfo[_]` case. Hence, it will always be false when we 
come to this point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135587110
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1398,14 +1412,29 @@ object AggregateUtil {
   }
 }
 
+val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
 // create accumulator type information for every aggregate function
 aggregates.zipWithIndex.foreach { case (agg, index) =>
-  if (null == accTypes(index)) {
+  if (accTypes(index) != null) {
+val (accType, specs) = removeStateViewFieldsFromAccTypeInfo(index,
+  agg,
+  accTypes(index),
+  isStateBackedDataViews)
+if (specs.isDefined) {
+  accSpecs(index) = specs.get
+  accTypes(index) = accType
+} else {
+  accSpecs(index) = Seq()
+  accTypes(index) = getAccumulatorTypeOfAggregateFunction(agg)
--- End diff --

No need to override `accTypes(index)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135561193
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -237,9 +248,13 @@ public void resetAccumulator(CountDistinctAccum acc) {
//Overloaded retract method
public void retract(CountDistinctAccum accumulator, long id) {
try {
-   if 
(!accumulator.map.contains(String.valueOf(id))) {
-   
accumulator.map.remove(String.valueOf(id));
-   accumulator.count -= 1;
+   Integer cnt = 
accumulator.map.get(String.valueOf(id));
+   if (cnt != null) {
+   cnt -= 1;
+   if (cnt <= 0) {
+   
accumulator.map.remove(String.valueOf(id));
+   accumulator.count -= 1;
+   }
--- End diff --

We should update the count if it is > 0:

```
else { 
  accumulator.map.put(String.valueOf(id), cnt)
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135524948
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -42,6 +46,18 @@ class AggregationCodeGenerator(
 input: TypeInformation[_ <: Any])
   extends CodeGenerator(config, nullableInput, input) {
 
+  // set of statements for cleanup dataview that will be added only once
+  // we use a LinkedHashSet to keep the insertion order
+  private val reusableCleanupStatements = mutable.LinkedHashSet[String]()
+
+  /**
+* @return code block of statements that need to be placed in the 
cleanup() method of
--- End diff --

`RichFunction` does not have a `cleanup()` method. The `cleanup()` method 
is a method of `GeneratedAggregations`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135504282
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.reflect.Field
+
+import org.apache.flink.api.common.state.{ListStateDescriptor, 
MapStateDescriptor, StateDescriptor}
+import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo}
+
+/**
+  * Data view specification.
+  *
+  * @tparam ACC type extends [[DataView]]
+  */
+trait DataViewSpec[ACC <: DataView] {
+  def id: String
+  def field: Field
+  def toStateDescriptor: StateDescriptor[_, _]
--- End diff --

Very good point, you are right! 
We need to generate the state descriptors here, serialize them and ship 
them.

Thanks for the clarification.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135503957
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -100,6 +108,17 @@ abstract class GeneratedAggregations extends Function {
 * aggregated results
 */
   def resetAccumulator(accumulators: Row)
+
+  /**
+* Cleanup for the accumulators.
+*/
+  def cleanup()
+
+  /**
+* Tear-down method for 
[[org.apache.flink.table.functions.AggregateFunction]].
+* It can be used for clean up work. By default, this method does 
nothing.
+*/
+  def close()
--- End diff --

Sorry, I overlooked the `close()` calls. If the method is used, we should 
keep it of course.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135184770
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
+
+   @Override
+   public CountDistinctAccum createAccumulator() {
+   CountDistinctAccum accum = new CountDistinctAccum();
+   accum.map = new MapView<>(Types.STRING, Types.INT);
+   accum.count = 0L;
+   return accum;
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, String 
id) {
+   try {
+   if (!accumulator.map.contains(id)) {
+   accumulator.map.put(id, 1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, long id) 
{
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   accumulator.map.put(String.valueOf(id), 
1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   @Override
+   public Long getValue(CountDistinctAccum accumulator) {
+   return accumulator.count;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge.
+*/
+   public static class CountDistinctWithMerge extends CountDistinct {
+
+   //Overloaded merge method
+   public void merge(CountDistinctAccum acc, 
Iterable it) {
+   Iterator iter = it.iterator();
+   while (iter.hasNext()) {
+   CountDistinctAccum mergeAcc = iter.next();
+   acc.count += mergeAcc.count;
+
+   try {
+   Iterator mapItr = 
mergeAcc.map.keys().iterator();
+   while (mapItr.hasNext()) {
+   String key = mapItr.next();
+   if (!acc.map.contains(key)) {
+   acc.map.put(key, 1);
+   }
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge and reset.
+*/
+   public static class CountDistinctWithMergeAndReset extends 
CountDistinctWithMerge {
+
+   //Overloaded retract method
+   public void resetAccumulator(CountDistinctAccum acc) {
+   acc.map.clear();
+   acc.count = 0;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with retract.
+*/
+   public static class CountDistinctWithRetractAndReset extends 
CountDistinct {
+
+   //Overloaded retract method
+   public void retract(CountDistinctAccum accumulator, long id) {
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   
accumulator.map.remove(String.valueOf(id));
--- End diff --

The code here is the opposite. It should be:
```
if 
(accumulator.map.contains(String.valueOf(id))) {
accumulator.count -= 1;

accumulator.map.remove(String.valueOf(id));
}
```

 

[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135181433
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -100,6 +108,17 @@ abstract class GeneratedAggregations extends Function {
 * aggregated results
 */
   def resetAccumulator(accumulators: Row)
+
+  /**
+* Cleanup for the accumulators.
+*/
+  def cleanup()
+
+  /**
+* Tear-down method for 
[[org.apache.flink.table.functions.AggregateFunction]].
+* It can be used for clean up work. By default, this method does 
nothing.
+*/
+  def close()
--- End diff --

The close() method is corresponding to open method. And it will be called 
when open was called before, e.g. ProcTimeUnboundedOver's close() method. 
Otherwise, the udagg's close will not be called. 

I modify the case testGroupAggregateWithStateBackend to test the close was 
called.

what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135179322
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.reflect.Field
+
+import org.apache.flink.api.common.state.{ListStateDescriptor, 
MapStateDescriptor, StateDescriptor}
+import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo}
+
+/**
+  * Data view specification.
+  *
+  * @tparam ACC type extends [[DataView]]
+  */
+trait DataViewSpec[ACC <: DataView] {
+  def id: String
+  def field: Field
+  def toStateDescriptor: StateDescriptor[_, _]
--- End diff --

Maybe this method also needed. State descriptors cannot code generated as 
the TypeInformation was passed by users.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135179087
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
--- End diff --

state descriptors need TypeInformation which passed by user,  so we can not 
code gen state descriptor. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135179101
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -178,13 +294,15 @@ class AggregationCodeGenerator(
|  ${aggMapping(i)},
|  (${accTypes(i)}) accs.getField($i));""".stripMargin
   } else {
+val setDataView = genDataViewFieldSetter(s"acc$i", i)
 j"""
|org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
|  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
-   |
+   |${accTypes(i)} acc$i = (${accTypes(i)}) 
accs.getField($i);
+   |$setDataView
--- End diff --

yes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135173258
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
+val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
+Base64.encodeBase64URLSafeString(byteArray)
+  }
+
+  @throws[Exception]
+  def deserialize(data: String): StateDescriptor[_, _] = {
+val byteData = Base64.decodeBase64(data)
+InstantiationUtil.deserializeObject[StateDescriptor[_, _]](
+  byteData,
+  Thread.currentThread.getContextClassLoader)
+  }
+
+  def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
+s"acc${aggIndex}_${fieldName}_dataview"
+  }
 }
+
+case class DataViewConfig(accSpecs: Array[Seq[DataViewSpec[_]]], 
isStateBackedDataViews: Boolean)
--- End diff --

yes, this class not need now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135172148
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -759,10 +786,12 @@ object AggregateUtil {
 : GroupCombineFunction[Row, Row] = {
 
 val needRetract = false
-val (aggFieldIndexes, aggregates, accTypes) = 
transformToAggregateFunctions(
+val isStateBackedDataViews = false
+val (aggFieldIndexes, aggregates, accTypes, accSpecs) = 
transformToAggregateFunctions(
--- End diff --

it make sense


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135171980
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -356,6 +486,9 @@ class AggregationCodeGenerator(
""".stripMargin
 
   if (needMerge) {
+if (accConfig != null && accConfig.isStateBackedDataViews) {
+  throw new CodeGenException("DataView doesn't support merge when 
the backend uses state.")
--- End diff --

I found the exception will be throw in everywhere call 
generateAggregations(..).
I think we can print funcName in exception info.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135163096
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
+
+   @Override
+   public CountDistinctAccum createAccumulator() {
+   CountDistinctAccum accum = new CountDistinctAccum();
+   accum.map = new MapView<>(Types.STRING, Types.INT);
+   accum.count = 0L;
+   return accum;
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, String 
id) {
+   try {
+   if (!accumulator.map.contains(id)) {
+   accumulator.map.put(id, 1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, long id) 
{
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   accumulator.map.put(String.valueOf(id), 
1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   @Override
+   public Long getValue(CountDistinctAccum accumulator) {
+   return accumulator.count;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge.
+*/
+   public static class CountDistinctWithMerge extends CountDistinct {
+
+   //Overloaded merge method
+   public void merge(CountDistinctAccum acc, 
Iterable it) {
+   Iterator iter = it.iterator();
+   while (iter.hasNext()) {
+   CountDistinctAccum mergeAcc = iter.next();
+   acc.count += mergeAcc.count;
+
+   try {
+   Iterator mapItr = 
mergeAcc.map.keys().iterator();
+   while (mapItr.hasNext()) {
+   String key = mapItr.next();
+   if (!acc.map.contains(key)) {
+   acc.map.put(key, 1);
+   }
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge and reset.
+*/
+   public static class CountDistinctWithMergeAndReset extends 
CountDistinctWithMerge {
+
+   //Overloaded retract method
+   public void resetAccumulator(CountDistinctAccum acc) {
+   acc.map.clear();
+   acc.count = 0;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with retract.
+*/
+   public static class CountDistinctWithRetractAndReset extends 
CountDistinct {
+
+   //Overloaded retract method
+   public void retract(CountDistinctAccum accumulator, long id) {
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   
accumulator.map.remove(String.valueOf(id));
+   accumulator.count -= 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   //Overloaded retract method
+   public void resetAccumulator(CountDistinctAccum acc) {

[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135162070
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
+val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
+Base64.encodeBase64URLSafeString(byteArray)
+  }
+
+  @throws[Exception]
+  def deserialize(data: String): StateDescriptor[_, _] = {
+val byteData = Base64.decodeBase64(data)
+InstantiationUtil.deserializeObject[StateDescriptor[_, _]](
+  byteData,
+  Thread.currentThread.getContextClassLoader)
+  }
+
+  def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
--- End diff --

yes, just the omission of code refactoring


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112454
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import org.apache.flink.api.common.typeutils.base.{MapSerializer, 
MapSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * A serializer for [[MapView]]. The serializer relies on a key 
serializer and a value
+  * serializer for the serialization of the map's key-value pairs.
+  *
+  * The serialization format for the map is as follows: four bytes for 
the length of the map,
+  * followed by the serialized representation of each key-value pair. To 
allow null values,
+  * each value is prefixed by a null marker.
+  *
+  * @param mapSerializer Map serializer.
+  * @tparam K The type of the keys in the map.
+  * @tparam V The type of the values in the map.
+  */
+@Internal
+class MapViewSerializer[K, V](val mapSerializer: MapSerializer[K, V])
+  extends TypeSerializer[MapView[K, V]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[MapView[K, V]] =
+new MapViewSerializer[K, V](
+  mapSerializer.duplicate().asInstanceOf[MapSerializer[K, V]])
+
+  override def createInstance(): MapView[K, V] = {
+val mapview = new MapView[K, V]
+mapview.putAll(mapSerializer.createInstance())
--- End diff --

No need to add anything to the new instance. The map should be empty.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135128323
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
+
+   @Override
+   public CountDistinctAccum createAccumulator() {
+   CountDistinctAccum accum = new CountDistinctAccum();
+   accum.map = new MapView<>(Types.STRING, Types.INT);
+   accum.count = 0L;
+   return accum;
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, String 
id) {
+   try {
+   if (!accumulator.map.contains(id)) {
+   accumulator.map.put(id, 1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, long id) 
{
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   accumulator.map.put(String.valueOf(id), 
1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   @Override
+   public Long getValue(CountDistinctAccum accumulator) {
+   return accumulator.count;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge.
+*/
+   public static class CountDistinctWithMerge extends CountDistinct {
+
+   //Overloaded merge method
+   public void merge(CountDistinctAccum acc, 
Iterable it) {
+   Iterator iter = it.iterator();
+   while (iter.hasNext()) {
+   CountDistinctAccum mergeAcc = iter.next();
+   acc.count += mergeAcc.count;
+
+   try {
+   Iterator mapItr = 
mergeAcc.map.keys().iterator();
+   while (mapItr.hasNext()) {
+   String key = mapItr.next();
+   if (!acc.map.contains(key)) {
+   acc.map.put(key, 1);
+   }
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge and reset.
+*/
+   public static class CountDistinctWithMergeAndReset extends 
CountDistinctWithMerge {
+
+   //Overloaded retract method
+   public void resetAccumulator(CountDistinctAccum acc) {
+   acc.map.clear();
+   acc.count = 0;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with retract.
+*/
+   public static class CountDistinctWithRetractAndReset extends 
CountDistinct {
+
+   //Overloaded retract method
+   public void retract(CountDistinctAccum accumulator, long id) {
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   
accumulator.map.remove(String.valueOf(id));
--- End diff --

shouldn't a count distinct with retraction increment the counter value in 
the MapView in `accumulate` and decrement the counter in `retract`? Only when 
the counter reaches 0, the map entry should be removed and `accumulator.count` 
be decremented.


---
If your project is set up for it, you can reply to this email and have your

[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135140565
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/DataViewSpec.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.reflect.Field
+
+import org.apache.flink.api.common.state.{ListStateDescriptor, 
MapStateDescriptor, StateDescriptor}
+import org.apache.flink.table.dataview.{ListViewTypeInfo, MapViewTypeInfo}
+
+/**
+  * Data view specification.
+  *
+  * @tparam ACC type extends [[DataView]]
+  */
+trait DataViewSpec[ACC <: DataView] {
+  def id: String
+  def field: Field
+  def toStateDescriptor: StateDescriptor[_, _]
--- End diff --

Do we need this method? State descriptors can be initatiated by generated 
code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135126762
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -100,6 +108,17 @@ abstract class GeneratedAggregations extends Function {
 * aggregated results
 */
   def resetAccumulator(accumulators: Row)
+
+  /**
+* Cleanup for the accumulators.
+*/
+  def cleanup()
+
+  /**
+* Tear-down method for 
[[org.apache.flink.table.functions.AggregateFunction]].
+* It can be used for clean up work. By default, this method does 
nothing.
+*/
+  def close()
--- End diff --

I think `close()` is never called. So we can remove it, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135109036
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
--- End diff --

lost -> list, remove the `` tag


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135121779
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
+val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
+Base64.encodeBase64URLSafeString(byteArray)
+  }
+
+  @throws[Exception]
+  def deserialize(data: String): StateDescriptor[_, _] = {
+val byteData = Base64.decodeBase64(data)
+InstantiationUtil.deserializeObject[StateDescriptor[_, _]](
+  byteData,
+  Thread.currentThread.getContextClassLoader)
+  }
+
+  def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
+s"acc${aggIndex}_${fieldName}_dataview"
+  }
 }
+
+case class DataViewConfig(accSpecs: Array[Seq[DataViewSpec[_]]], 
isStateBackedDataViews: Boolean)
--- End diff --

We only need a `DataViewConfig` if the views are backed by state right? So 
we can remove the `boolean` `isStateBackedDataViews` field and make it an 
optional parameter of the code generator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112753
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewTypeInfo.scala
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.MapSerializer
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * [[MapView]] type information.
+  *
+  * @param keyType key type information
+  * @param valueType value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@PublicEvolving
--- End diff --

Please remove the annotation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135125473
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
+val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
+Base64.encodeBase64URLSafeString(byteArray)
+  }
+
+  @throws[Exception]
+  def deserialize(data: String): StateDescriptor[_, _] = {
+val byteData = Base64.decodeBase64(data)
+InstantiationUtil.deserializeObject[StateDescriptor[_, _]](
+  byteData,
+  Thread.currentThread.getContextClassLoader)
+  }
+
+  def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
--- End diff --

Move this method to `AggregationCodeGenerator`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135131075
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -79,13 +95,15 @@ class AggregationCodeGenerator(
 outputArity: Int,
 needRetract: Boolean,
 needMerge: Boolean,
-needReset: Boolean)
+needReset: Boolean,
+accConfig: DataViewConfig)
--- End diff --

make this an `Option[Array[Seq[DataViewSpec[_` with `None` default 
value?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135133385
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
--- End diff --

The code is not `ListView` specific. Change to `DataView`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135125391
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
--- End diff --

do we need this method? Instantiation of state descriptors can be code 
generated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135135021
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
+val dataViewFieldTerm = AggregateUtil.createDataViewTerm(i, 
dataViewField.getName)
+val field =
+  s"""
+ |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+
+// create DataViews
+val descFieldTerm = s"${dataViewFieldTerm}_desc"
+val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+val descDeserialize =
+  s"""
+ |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+ |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+ |  .deserialize("$serializedData");
+ """.stripMargin
+val createDataView = if (dataViewField.getType == 
classOf[MapView[_, _]]) {
+  s"""
+ |$descDeserialize
+ |$dataViewFieldTerm = new 
org.apache.flink.table.dataview.StateMapView(
+ |  $contextTerm.getMapState((
+ |
org.apache.flink.api.common.state.MapStateDescriptor)$descFieldTerm));
+   """.stripMargin
--- End diff --

indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135134452
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
--- End diff --

Why do we need to serialize and deserialize state descriptors? Can't we 
just generate the code to instantiate them? IMO, that would be more 
straightforward and easier to debug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135110707
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](val listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.createInstance())
+listview
+  }
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.copy(from.list))
+listview
+  }
+
+  override def copy(from: ListView[T], reuse: ListView[T]): ListView[T] = 
copy(from)
+
+  override def getLength: Int = -1
+
+  override def serialize(record: ListView[T], target: DataOutputView): 
Unit = {
+listSerializer.serialize(record.list, target)
+  }
+
+  override def deserialize(source: DataInputView): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.deserialize(source))
--- End diff --

same as before. We could directly set the list.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135125423
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1469,4 +1526,26 @@ object AggregateUtil {
   private def gcd(a: Long, b: Long): Long = {
 if (b == 0) a else gcd(b, a % b)
   }
+
+  @throws[Exception]
+  def serialize(stateDescriptor: StateDescriptor[_, _]): String = {
+val byteArray = InstantiationUtil.serializeObject(stateDescriptor)
+Base64.encodeBase64URLSafeString(byteArray)
+  }
+
+  @throws[Exception]
+  def deserialize(data: String): StateDescriptor[_, _] = {
--- End diff --

do we need this method? Instantiation of state descriptors can be code 
generated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135124865
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -1398,14 +1440,29 @@ object AggregateUtil {
   }
 }
 
+val accSpecs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
+
 // create accumulator type information for every aggregate function
 aggregates.zipWithIndex.foreach { case (agg, index) =>
-  if (null == accTypes(index)) {
+  if (accTypes(index) != null) {
+val (accType, specs) = removeStateViewFieldsFromAccTypeInfo(index,
+  agg,
+  accTypes(index),
+  isStateBackedDataViews)
+if (specs.isDefined) {
+  accSpecs(index) = specs.get
+  accTypes(index) = accType
+} else {
+  accTypes(index) = getAccumulatorTypeOfAggregateFunction(agg)
--- End diff --

use same order as above:
```
accSpecs(index) = ...
accTypes(index) = 
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135134994
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
+val dataViewFieldTerm = AggregateUtil.createDataViewTerm(i, 
dataViewField.getName)
+val field =
+  s"""
+ |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+
+// create DataViews
+val descFieldTerm = s"${dataViewFieldTerm}_desc"
+val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+val descDeserialize =
+  s"""
+ |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+ |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+ |  .deserialize("$serializedData");
+ """.stripMargin
--- End diff --

indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112227
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
--- End diff --

Please remove the annotation.
we don't use the `@Internal`, `@Public` and `@PublicEvolving` annotations 
in the `flink-table` module.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135124134
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -759,10 +786,12 @@ object AggregateUtil {
 : GroupCombineFunction[Row, Row] = {
 
 val needRetract = false
-val (aggFieldIndexes, aggregates, accTypes) = 
transformToAggregateFunctions(
+val isStateBackedDataViews = false
+val (aggFieldIndexes, aggregates, accTypes, accSpecs) = 
transformToAggregateFunctions(
--- End diff --

`transformToAggregateFunctions` has a default value for 
`isStateBackedDataViews`. So we don't need to pass a parameter. Moreover, we 
only need to pass data view information to the code generator if the data views 
are backed by state (if we make it an optional parameter). So most of the 
changes here can be reverted (and also for all other operators that do not 
support state backed views).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135107609
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *public void accumulate(MyAccum accumulator, String id) {
+  *  try {
+  *  if (!accumulator.map.contains(id)) {
+  *accumulator.map.put(id, 1);
+  *accumulator.count++;
+  *  }
+  *  } catch (Exception e) {
+  *e.printStackTrace();
+  *  }
+  *}
+  *
+  *@Override
+  *public Long getValue(MyAccum accumulator) {
+  *  return accumulator.count;
+  *}
+  *  }
+  *
+  * }}}
+  *
+  * @param keyTypeInfo key type information
+  * @param valueTypeInfo value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
+class MapView[K, V](
+@transient private[flink] val keyTypeInfo: TypeInformation[K],
+@transient private[flink] val valueTypeInfo: TypeInformation[V])
+  extends DataView {
+
+  def this() = this(null, null)
+
+  private[flink] var map = new util.HashMap[K, V]()
+
+  /**
+* Returns the value to which the specified key is mapped, or { @code 
null } if this map
+* contains no mapping for the key.
+*
+* @param key The key of the mapping.
+* @return The value of the mapping with the given key.
+* @throws Exception Thrown if the system cannot get data.
+*/
+  @throws[Exception]
+  def get(key: K): V = map.get(key)
+
+  /**
+* Put a value with the given key into the map.
+*
+* @param key   The key of the mapping.
+* @param value The new value of the mapping.
+* @throws Exception Thrown if the system cannot put data.
+*/
+  @throws[Exception]
+  def put(key: K, value: V): Unit = map.put(key, value)
+
+  /**
+* Copies all of the mappings from the specified map to this map view.
+*
+* @param map The mappings to be stored in this map.
+* @throws Exception Thrown if the system cannot access the map.
+*/
+  @throws[Exception]
+  def putAll(map: util.Map[K, V]): Unit = this.map.putAll(map)
+
+  /**
+* Deletes the mapping of the given key.
+*
+* @param key The key of the mapping.
+* @throws Exception Thrown if the system cannot access the map.
+*/
+  @throws[Exception]
+  def remove(key: K): Unit = map.remove(key)
+
+  /**
+* Returns whether there exists the given mapping.

[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135110511
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](val listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.createInstance())
+listview
+  }
+
+  override def copy(from: ListView[T]): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.copy(from.list))
--- End diff --

`addAll()` adds overhead because all elements are copied from on list to 
the other. Can't the serializer have direct access to the list field of the 
`ListView` and replace set the list copy as the `ListView`'s list?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112047
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import org.apache.flink.api.common.typeutils.base.{MapSerializer, 
MapSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * A serializer for [[MapView]]. The serializer relies on a key 
serializer and a value
+  * serializer for the serialization of the map's key-value pairs.
+  *
+  * The serialization format for the map is as follows: four bytes for 
the length of the map,
+  * followed by the serialized representation of each key-value pair. To 
allow null values,
+  * each value is prefixed by a null marker.
+  *
+  * @param mapSerializer Map serializer.
+  * @tparam K The type of the keys in the map.
+  * @tparam V The type of the values in the map.
+  */
+@Internal
--- End diff --

Please remove the annotation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112662
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/MapViewSerializer.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import org.apache.flink.api.common.typeutils.base.{MapSerializer, 
MapSerializerConfigSnapshot}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.MapView
+
+/**
+  * A serializer for [[MapView]]. The serializer relies on a key 
serializer and a value
+  * serializer for the serialization of the map's key-value pairs.
+  *
+  * The serialization format for the map is as follows: four bytes for 
the length of the map,
+  * followed by the serialized representation of each key-value pair. To 
allow null values,
+  * each value is prefixed by a null marker.
+  *
+  * @param mapSerializer Map serializer.
+  * @tparam K The type of the keys in the map.
+  * @tparam V The type of the values in the map.
+  */
+@Internal
+class MapViewSerializer[K, V](val mapSerializer: MapSerializer[K, V])
+  extends TypeSerializer[MapView[K, V]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[MapView[K, V]] =
+new MapViewSerializer[K, V](
+  mapSerializer.duplicate().asInstanceOf[MapSerializer[K, V]])
+
+  override def createInstance(): MapView[K, V] = {
+val mapview = new MapView[K, V]
+mapview.putAll(mapSerializer.createInstance())
+mapview
+  }
+
+  override def copy(from: MapView[K, V]): MapView[K, V] = {
+val mapview = new MapView[K, V]
+mapview.putAll(mapSerializer.copy(from.map))
--- End diff --

Same as for `ListStateSerializer`. Copying one list into another is more 
expensive than just replacing the list instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135109902
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewSerializer.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import java.util
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeutils._
+import 
org.apache.flink.api.common.typeutils.base.{CollectionSerializerConfigSnapshot, 
ListSerializer}
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * A serializer for [[ListView]]. The serializer relies on an element
+  * serializer for the serialization of the list's elements.
+  *
+  * The serialization format for the list is as follows: four bytes for 
the length of the lost,
+  * followed by the serialized representation of each element.
+  *
+  * @param listSerializer List serializer.
+  * @tparam T The type of element in the list.
+  */
+@Internal
+class ListViewSerializer[T](val listSerializer: ListSerializer[T])
+  extends TypeSerializer[ListView[T]] {
+
+  override def isImmutableType: Boolean = false
+
+  override def duplicate(): TypeSerializer[ListView[T]] = {
+new 
ListViewSerializer[T](listSerializer.duplicate().asInstanceOf[ListSerializer[T]])
+  }
+
+  override def createInstance(): ListView[T] = {
+val listview = new ListView[T]
+listview.addAll(listSerializer.createInstance())
--- End diff --

I think this call is not necessary. The new list instance is empty, so 
nothing is added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135117112
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,139 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* Analyze the constructor to get the type information of the MapView 
or ListView type variables
+* inside the accumulate.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
+for (i <- 0 until fields.size()) {
+  val field = fields.get(i)
+  field.setAccessible(true)
+  if (classOf[DataView].isAssignableFrom(field.getType)) {
+if (field.getType == classOf[MapView[_, _]]) {
+  val mapView = field.get(acc).asInstanceOf[MapView[_, _]]
+  if (mapView != null) {
+val keyTypeInfo = mapView.keyTypeInfo
+val valueTypeInfo = mapView.valueTypeInfo
+
+if (keyTypeInfo != null && valueTypeInfo != null) {
+  resultMap.put(field.getName, new 
MapViewTypeInfo(keyTypeInfo, valueTypeInfo))
+}
+  } else {
+resultMap.put(field.getName, null)
+  }
+} else if (field.getType == classOf[ListView[_]]) {
+  val listView = field.get(acc).asInstanceOf[ListView[_]]
+  val elementTypeInfo = listView.elementTypeInfo
+
+  if (elementTypeInfo != null) {
+resultMap.put(field.getName, new 
ListViewTypeInfo(elementTypeInfo))
+  }
+}
+  }
+}
+
+resultMap
+  }
+
+  /**
+* Remove StateView fields from accumulator type information.
+*
+* @param index index of aggregate function
+* @param aggFun aggregate function
+* @param accType accumulator type information, only support pojo type
+* @param isStateBackedDataViews is data views use state backend
+* @return mapping of accumulator type information and data view config 
which contains id,
+* field name and state descriptor
+*/
+  def removeStateViewFieldsFromAccTypeInfo(
+index: Int,
+aggFun: AggregateFunction[_, _],
+accType: TypeInformation[_],
+isStateBackedDataViews: Boolean)
+  : (TypeInformation[_], Option[Seq[DataViewSpec[_]]]) = {
+
+var hasDataView = false
+val acc = aggFun.createAccumulator()
+accType match {
+  case pojoType: PojoTypeInfo[_] if pojoType.getArity > 0 =>
+val arity = pojoType.getArity
+val newPojoFields = new util.ArrayList[PojoField]()
+val accumulatorSpecs = new mutable.ArrayBuffer[DataViewSpec[_]]
+for (i <- 0 until arity) {
+  val pojoField = pojoType.getPojoFieldAt(i)
+  val field = pojoField.getField
+  val fieldName = field.getName
+  field.setAccessible(true)
+
+  pojoField.getTypeInformation match {
+case map: MapViewTypeInfo[Any, Any] =>
+  val mapView = field.get(acc).asInstanceOf[MapView[_, _]]
+  if (mapView != null) {
+val keyTypeInfo = mapView.keyTypeInfo
+val valueTypeInfo = mapView.valueTypeInfo
+val newTypeInfo = if (keyTypeInfo != null && valueTypeInfo 
!= null) {
+  hasDataView = true
+  new MapViewTypeInfo(keyTypeInfo, valueTypeInfo)
+} else {
+  map
+}
+
+var spec = MapViewSpec(
+  "agg" + index + "$" + fieldName, // generate unique name 
to be used as state name
+  field,
+  newTypeInfo)
+
+accumulatorSpecs += spec
+if (!isStateBackedDataViews) { // add data view field 
which not use state backend
+  newPojoFields.add(new PojoField(field, newTypeInfo))
+}
+  }
+
+case list: ListViewTypeInfo[Any] =>
+  val listView = field.get(acc).asInstanceOf[ListView[_]]
+  if (listView != null) {
+val elementTypeInfo = listView.elementTypeInfo

[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135135051
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -161,13 +182,108 @@ class AggregationCodeGenerator(
 }
 }
 
+/**
+  * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
+  * close and member area of the generated function.
+  *
+  */
+def addReusableDataViews: Unit = {
+  if (accConfig != null && accConfig.isStateBackedDataViews) {
+val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
+for (i <- aggs.indices) yield {
+  for (spec <- accConfig.accSpecs(i)) yield {
+val dataViewField = spec.field
+val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+val desc = descMapping.getOrElse(spec.id,
+  throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+// define the DataView variables
+val serializedData = AggregateUtil.serialize(desc)
+val dataViewFieldTerm = AggregateUtil.createDataViewTerm(i, 
dataViewField.getName)
+val field =
+  s"""
+ |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+
+// create DataViews
+val descFieldTerm = s"${dataViewFieldTerm}_desc"
+val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+val descDeserialize =
+  s"""
+ |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+ |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+ |  .deserialize("$serializedData");
+ """.stripMargin
+val createDataView = if (dataViewField.getType == 
classOf[MapView[_, _]]) {
+  s"""
+ |$descDeserialize
+ |$dataViewFieldTerm = new 
org.apache.flink.table.dataview.StateMapView(
+ |  $contextTerm.getMapState((
+ |
org.apache.flink.api.common.state.MapStateDescriptor)$descFieldTerm));
+   """.stripMargin
+} else if (dataViewField.getType == classOf[ListView[_]]) {
+  s"""
+ |$descDeserialize
+ |$dataViewFieldTerm = new 
org.apache.flink.table.dataview.StateListView(
+ |  $contextTerm.getListState((
+ |
org.apache.flink.api.common.state.ListStateDescriptor)$descFieldTerm));
+   """.stripMargin
--- End diff --

indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135107708
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * when use state backend..
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](
+@transient private[flink] val elementTypeInfo: TypeInformation[T])
+  extends DataView {
+
+  def this() = this(null)
+
+  private[flink] val list = new util.ArrayList[T]()
+
+  /**
+* Returns an iterable of the list.
+*
+* @throws Exception Thrown if the system cannot get data.
+* @return The iterable of the list or { @code null} if the list is 
empty.
+*/
+  @throws[Exception]
+  def get: JIterable[T] = {
+if (!list.isEmpty) {
+  list
+} else {
+  null
+}
+  }
+
+  /**
+* Adding the given value to the list.
+*
+* @throws Exception Thrown if the system cannot add data.
+* @param value element to be appended to this list
+*/
+  @throws[Exception]
+  def add(value: T): Unit = list.add(value)
+
+  /**
+* Copies all of the elements from the specified list to this list view.
+*
+* @throws Exception Thrown if the system cannot add all data.
+* @param list The list to be stored in this list view.
+*/
+  @throws[Exception]
+  def addAll(list: util.List[T]): Unit = this.list.addAll(list)
+
+  /**
+* Removes all of the elements from this list.
+*
+* The list will be empty after this call returns.
+*/
+  override def clear(): Unit = list.clear()
+
+  override def equals(other: Any): Boolean = other match {
+case that: ListView[_] =>
--- End diff --

`case that: ListView[T] =>`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135138596
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -178,13 +294,15 @@ class AggregationCodeGenerator(
|  ${aggMapping(i)},
|  (${accTypes(i)}) accs.getField($i));""".stripMargin
   } else {
+val setDataView = genDataViewFieldSetter(s"acc$i", i)
 j"""
|org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
|  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
-   |
+   |${accTypes(i)} acc$i = (${accTypes(i)}) 
accs.getField($i);
+   |$setDataView
--- End diff --

directly call `genDataViewFieldSetter(s"acc$i", i)` here? 
Would make it easier to follow the `acc$i` variables.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135139164
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -356,6 +486,9 @@ class AggregationCodeGenerator(
""".stripMargin
 
   if (needMerge) {
+if (accConfig != null && accConfig.isStateBackedDataViews) {
+  throw new CodeGenException("DataView doesn't support merge when 
the backend uses state.")
--- End diff --

Can we throw this exception earlier (e.g., in `AggregateUtil`) and give 
more details about the aggregation function?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135128744
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
+
+   @Override
+   public CountDistinctAccum createAccumulator() {
+   CountDistinctAccum accum = new CountDistinctAccum();
+   accum.map = new MapView<>(Types.STRING, Types.INT);
+   accum.count = 0L;
+   return accum;
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, String 
id) {
+   try {
+   if (!accumulator.map.contains(id)) {
+   accumulator.map.put(id, 1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   //Overloaded accumulate method
+   public void accumulate(CountDistinctAccum accumulator, long id) 
{
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   accumulator.map.put(String.valueOf(id), 
1);
+   accumulator.count += 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   @Override
+   public Long getValue(CountDistinctAccum accumulator) {
+   return accumulator.count;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge.
+*/
+   public static class CountDistinctWithMerge extends CountDistinct {
+
+   //Overloaded merge method
+   public void merge(CountDistinctAccum acc, 
Iterable it) {
+   Iterator iter = it.iterator();
+   while (iter.hasNext()) {
+   CountDistinctAccum mergeAcc = iter.next();
+   acc.count += mergeAcc.count;
+
+   try {
+   Iterator mapItr = 
mergeAcc.map.keys().iterator();
+   while (mapItr.hasNext()) {
+   String key = mapItr.next();
+   if (!acc.map.contains(key)) {
+   acc.map.put(key, 1);
+   }
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+   }
+   }
+
+   /**
+* CountDistinct aggregate with merge and reset.
+*/
+   public static class CountDistinctWithMergeAndReset extends 
CountDistinctWithMerge {
+
+   //Overloaded retract method
+   public void resetAccumulator(CountDistinctAccum acc) {
+   acc.map.clear();
+   acc.count = 0;
+   }
+   }
+
+   /**
+* CountDistinct aggregate with retract.
+*/
+   public static class CountDistinctWithRetractAndReset extends 
CountDistinct {
+
+   //Overloaded retract method
+   public void retract(CountDistinctAccum accumulator, long id) {
+   try {
+   if 
(!accumulator.map.contains(String.valueOf(id))) {
+   
accumulator.map.remove(String.valueOf(id));
+   accumulator.count -= 1;
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
+   }
+   }
+
+   //Overloaded retract method
+   public void resetAccumulator(CountDistinctAccum acc) {
  

[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135112266
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/ListViewTypeInfo.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeutils.base.ListSerializer
+import org.apache.flink.table.api.dataview.ListView
+
+/**
+  * [[ListView]] type information.
+  *
+  * @param elementType element type information
+  * @tparam T element type
+  */
+@PublicEvolving
--- End diff --

Please remove the annotation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135108408
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1458,9 +1459,10 @@ abstract class CodeGenerator(
 * Adds a reusable [[UserDefinedFunction]] to the member area of the 
generated [[Function]].
 *
 * @param function [[UserDefinedFunction]] object to be instantiated 
during runtime
+* @param contextTerm [[RuntimeContext]] term
--- End diff --

`term to access the [[RuntimeContext]]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135106244
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * when use state backend..
--- End diff --

`when use state backend..` -> `if it is backed by a state backend.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r135107137
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
--- End diff --

`when use state backend` -> `if it is backed by a state backend.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-22 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134653998
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
+  indices: Range,
+  ctxTerm: String,
+  accConfig: Option[DataViewConfig])
+: String = {
+if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+  val initDataViews = new StringBuilder
+  val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.get.accSpecs
+.flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+.toMap[String, StateDescriptor[_, _]]
+
+  for (i <- indices) yield {
+for (spec <- accConfig.get.accSpecs(i)) yield {
+  val dataViewField = spec.field
+  val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+  val desc = descMapping.getOrElse(spec.id,
+throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+  val serializedData = AggregateUtil.serialize(desc)
+  val dataViewFieldTerm = 
s"acc${i}_${dataViewField.getName}_dataview"
+  val field =
+s"""
+   |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+   |""".stripMargin
+  reusableMemberStatements.add(field)
+
+  val descFieldTerm = s"${dataViewFieldTerm}_desc"
+  val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+  val descDeserialize =
+s"""
+   |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+   |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+   |  .deserialize("$serializedData");
+ """.stripMargin
+
+  val init = if (dataViewField.getType == classOf[MapView[_, _]]) {
+s"""
+   |$descDeserialize
+   |$dataViewFieldTerm =
+   |  
org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm,
--- End diff --

Agree, It can be codegen now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-22 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134653928
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
--- End diff --

Its a good idea, the code will be more clean after refactor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-22 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134653871
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
--- End diff --

Yes, RuntimeContext not need to be member var.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134135181
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -162,13 +172,66 @@ class AggregationCodeGenerator(
   }
 }
 
+def genDataViewFieldSetter(accTerm: String, specs: 
Seq[DataViewSpec[_]]): String = {
+  if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+val setters = for (spec <- specs) yield {
+  val field = spec.field
+  val dataViewTerm = s"${accTerm}_${field.getName}_dataview"
+  val fieldSetter = if (Modifier.isPublic(field.getModifiers)) {
+s"$accTerm.${field.getName} = $dataViewTerm;"
+  } else {
+val fieldTerm = 
addReusablePrivateFieldAccess(field.getDeclaringClass, field.getName)
+s"${reflectiveFieldWriteAccess(fieldTerm, field, accTerm, 
dataViewTerm)};"
+  }
+
+  s"""
+ |$fieldSetter
+""".stripMargin
+}
+setters.mkString("\n")
+  } else {
+""
+  }
+}
+
+def genCleanUpDataView: String = {
+  if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+val cleanUpDataViews = new StringBuilder
+for (i <- aggs.indices) yield {
+  val setters = for (spec <- accConfig.get.accSpecs(i)) yield {
+val dataViewTerm = s"acc${i}_${spec.field.getName}_dataview"
+val cleanUp =
+  s"""
+|$dataViewTerm.clear();
+  """.stripMargin
+cleanUpDataViews.append(cleanUp)
+  }
+}
+
+cleanUpDataViews.toString()
+  } else {
+""
+  }
+}
+
+def genInitialize: String = {
+
+j"""
+   |  public final void initialize(
--- End diff --

I would like to rename the method name to `open(ctx)`. So that we can use 
the `reusableOpenStatements` and `reuseOpenCode()` of `CodeGenerator` to 
generate the content of `open`.  Currently, the `genInitialize` is somewhat 
ambiguous to `reuseInitCode()`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134151688
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/StateViewUtils.scala
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.dataview
+
+import java.util
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.api.common.state._
+import org.apache.flink.table.api.dataview.{ListView, MapView}
+
+/**
+  * State view utils to create [[StateListView]] or [[StateMapView]]..
+  */
+object StateViewUtils {
--- End diff --

We may not need this, as we can code generate the creation code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134134801
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -100,6 +107,11 @@ abstract class GeneratedAggregations extends Function {
 * aggregated results
 */
   def resetAccumulator(accumulators: Row)
+
+  /**
+* Clean up for the accumulators.
+*/
+  def cleanUp()
--- End diff --

cleanup is also a word, so we do not need a upper case `U` here. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134153349
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -80,7 +82,8 @@ class AggregationCodeGenerator(
 outputArity: Int,
 needRetract: Boolean,
 needMerge: Boolean,
-needReset: Boolean)
+needReset: Boolean,
+accConfig: Option[DataViewConfig])
--- End diff --

It seems that `accConfig` is always `Some(x)`, do we need the `Option`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134137708
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
+  indices: Range,
+  ctxTerm: String,
+  accConfig: Option[DataViewConfig])
+: String = {
+if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+  val initDataViews = new StringBuilder
+  val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.get.accSpecs
+.flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+.toMap[String, StateDescriptor[_, _]]
+
+  for (i <- indices) yield {
+for (spec <- accConfig.get.accSpecs(i)) yield {
+  val dataViewField = spec.field
+  val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+  val desc = descMapping.getOrElse(spec.id,
+throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+  val serializedData = AggregateUtil.serialize(desc)
+  val dataViewFieldTerm = 
s"acc${i}_${dataViewField.getName}_dataview"
+  val field =
+s"""
+   |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+   |""".stripMargin
+  reusableMemberStatements.add(field)
+
+  val descFieldTerm = s"${dataViewFieldTerm}_desc"
+  val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+  val descDeserialize =
+s"""
+   |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+   |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+   |  .deserialize("$serializedData");
+ """.stripMargin
+
+  val init = if (dataViewField.getType == classOf[MapView[_, _]]) {
+s"""
+   |$descDeserialize
+   |$dataViewFieldTerm =
+   |  
org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm,
+   |  $ctxTerm);
+   """.stripMargin
+  } else if (dataViewField.getType == classOf[ListView[_]]) {
+s"""
+   |$descDeserialize
+   |$dataViewFieldTerm =
+   |  
org.apache.flink.table.dataview.StateViewUtils.createListView($descFieldTerm,
--- End diff --

Same as above, we can code gen the creation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118741
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *public void accumulate(MyAccum accumulator, String id) {
+  *  try {
+  *  if (!accumulator.map.contains(id)) {
+  *accumulator.map.put(id, 1);
+  *accumulator.count++;
+  *  }
+  *  } catch (Exception e) {
+  *e.printStackTrace();
+  *  }
+  *}
+  *
+  *@Override
+  *public Long getValue(MyAccum accumulator) {
+  *  return accumulator.count;
+  *}
+  *  }
+  *
+  * }}}
+  *
+  * @param keyTypeInfo key type information
+  * @param valueTypeInfo value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
+class MapView[K, V](
+   private[flink] val keyTypeInfo: TypeInformation[K],
+   private[flink] val valueTypeInfo: TypeInformation[V])
--- End diff --

I'm not sure whether it is good to add `private[flink]`,  because it is 
`public` for Java users actually.

And please make them `@transient`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118886
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -105,6 +108,13 @@ class AggregationCodeGenerator(
   inFields => for (f <- inFields) yield javaClasses(f)
 }
 
+// define runtimeContext as member variable
+val ctxTerm = s"runtimeContext"
--- End diff --

I think we do not need to make the runtimeContext as a member variable. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134138469
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
--- End diff --

The `addReusableDataViewConfig` should be in `AggregateCodeGenerator`. And 
I would like to change this method to `addReusableDataView(spec: DataViewSpec): 
String`, the returned String is the dataview member variable term. And the 
dataview creation code can be added into `reusableDataViewStatements`. And the 
code of `reusableDataViewStatements` can be added to `initialize(ctx)` (or 
`open(ctx)` as I suggested) at last.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134151559
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * when use state backend..
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
+
+  def this() = this(null)
+
+  val list = new util.ArrayList[T]()
+
+  /**
+* Returns an iterable of the list.
+*
+* @return The iterable of the list or { @code null} if the list is 
empty.
+*/
+  def get: JIterable[T] = {
+if (!list.isEmpty) {
+  list
+} else {
+  null
+}
+  }
+
+  /**
+* Adding the given value to the list.
+*
+* @param value element to be appended to this list
+*/
+  def add(value: T): Unit = list.add(value)
+
+  /**
+* Removes all of the elements from this list.
+*
+* The list will be empty after this call returns.
+*/
+  override def clear(): Unit = list.clear()
+
+  /**
+* Copy from a list instance.
+*
+* @param t List instance.
+* @return A copy of this list instance
+*/
+  def copyFrom(t: util.List[T]): ListView[T] = {
--- End diff --

the `copyFrom` method can be accessed by users. I should avoid this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134139167
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
--- End diff --

Do we need this ?  It only used to add `RuntimeContext` member area, but 
`RuntimeContext` is only used in `initialize`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118959
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -162,13 +172,66 @@ class AggregationCodeGenerator(
   }
 }
 
+def genDataViewFieldSetter(accTerm: String, specs: 
Seq[DataViewSpec[_]]): String = {
+  if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+val setters = for (spec <- specs) yield {
+  val field = spec.field
+  val dataViewTerm = s"${accTerm}_${field.getName}_dataview"
+  val fieldSetter = if (Modifier.isPublic(field.getModifiers)) {
+s"$accTerm.${field.getName} = $dataViewTerm;"
+  } else {
+val fieldTerm = 
addReusablePrivateFieldAccess(field.getDeclaringClass, field.getName)
+s"${reflectiveFieldWriteAccess(fieldTerm, field, accTerm, 
dataViewTerm)};"
+  }
+
+  s"""
+ |$fieldSetter
+""".stripMargin
--- End diff --

indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134152736
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
--- End diff --

Yes, I think we can use accumulator type info instead of field information.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118759
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *public void accumulate(MyAccum accumulator, String id) {
+  *  try {
+  *  if (!accumulator.map.contains(id)) {
+  *accumulator.map.put(id, 1);
+  *accumulator.count++;
+  *  }
+  *  } catch (Exception e) {
+  *e.printStackTrace();
+  *  }
+  *}
+  *
+  *@Override
+  *public Long getValue(MyAccum accumulator) {
+  *  return accumulator.count;
+  *}
+  *  }
+  *
+  * }}}
+  *
+  * @param keyTypeInfo key type information
+  * @param valueTypeInfo value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
+class MapView[K, V](
+   private[flink] val keyTypeInfo: TypeInformation[K],
+   private[flink] val valueTypeInfo: TypeInformation[V])
+  extends DataView {
+
+  def this() = this(null, null)
+
+  val map = new util.HashMap[K, V]()
+
+  /**
+* Returns the value to which the specified key is mapped, or { @code 
null } if this map
+* contains no mapping for the key.
+*
+* @param key The key of the mapping.
+* @return The value of the mapping with the given key.
+* @throws Exception Thrown if the system cannot get data.
+*/
+  @throws[Exception]
+  def get(key: K): V = map.get(key)
+
+  /**
+* Put a value with the given key into the map.
+*
+* @param key   The key of the mapping.
+* @param value The new value of the mapping.
+* @throws Exception Thrown if the system cannot put data.
+*/
+  @throws[Exception]
+  def put(key: K, value: V): Unit = map.put(key, value)
+
+  /**
+* Copies all of the mappings from the specified map to this map view.
+*
+* @param map The mappings to be stored in this map.
+* @throws Exception Thrown if the system cannot access the map.
+*/
+  @throws[Exception]
+  def putAll(map: util.Map[K, V]): Unit = map.putAll(map)
+
+  /**
+* Deletes the mapping of the given key.
+*
+* @param key The key of the mapping.
+* @throws Exception Thrown if the system cannot access the map.
+*/
+  @throws[Exception]
+  def remove(key: K): Unit = map.remove(key)
+
+  /**
+* Returns whether there exists the given mapping.
+*
+* @param key The key of the 

[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134137525
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1646,4 +1649,86 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable class to the member area of the generated 
[[Function]].
+*/
+  def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = {
+val field =
+  s"""
+ |transient ${clazz.getCanonicalName} $fieldTerm = null;
+ |""".stripMargin
+reusableMemberStatements.add(field)
+  }
+
+  /**
+* Adds a reusable [[DataViewConfig]] to the member area of the 
generated [[Function]].
+*
+* @param indices indices of aggregate functions.
+* @param ctxTerm field name of runtime context.
+* @param accConfig data view config which contains id, field and 
StateDescriptos.
+* @return statements to create [[MapView]] or [[ListView]].
+*/
+  def addReusableDataViewConfig(
+  indices: Range,
+  ctxTerm: String,
+  accConfig: Option[DataViewConfig])
+: String = {
+if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+  val initDataViews = new StringBuilder
+  val descMapping: Map[String, StateDescriptor[_, _]] = 
accConfig.get.accSpecs
+.flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+.toMap[String, StateDescriptor[_, _]]
+
+  for (i <- indices) yield {
+for (spec <- accConfig.get.accSpecs(i)) yield {
+  val dataViewField = spec.field
+  val dataViewTypeTerm = dataViewField.getType.getCanonicalName
+  val desc = descMapping.getOrElse(spec.id,
+throw new CodeGenException(s"Can not find ListView in 
accumulator by id: ${spec.id}"))
+
+  val serializedData = AggregateUtil.serialize(desc)
+  val dataViewFieldTerm = 
s"acc${i}_${dataViewField.getName}_dataview"
+  val field =
+s"""
+   |transient $dataViewTypeTerm $dataViewFieldTerm = null;
+   |""".stripMargin
+  reusableMemberStatements.add(field)
+
+  val descFieldTerm = s"${dataViewFieldTerm}_desc"
+  val descClassQualifier = classOf[StateDescriptor[_, 
_]].getCanonicalName
+  val descDeserialize =
+s"""
+   |$descClassQualifier $descFieldTerm = 
($descClassQualifier)
+   |  ${AggregateUtil.getClass.getName.stripSuffix("$")}
+   |  .deserialize("$serializedData");
+ """.stripMargin
+
+  val init = if (dataViewField.getType == classOf[MapView[_, _]]) {
+s"""
+   |$descDeserialize
+   |$dataViewFieldTerm =
+   |  
org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm,
--- End diff --

I think we do not need the `StateViewUtils` here, we can create a MapView 
using code gen directly, because we already have the RuntimeContext and 
StateDescriptor. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118408
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView provides List functionality for accumulators used by 
user-defined aggregate functions
+  * {{AggregateFunction}}.
+  *
+  * A ListView can be backed by a Java ArrayList or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `ListView` will be replaced by a 
[[org.apache.flink.table.dataview.StateListView]]
+  * when use state backend..
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
+
+  def this() = this(null)
+
+  val list = new util.ArrayList[T]()
--- End diff --

make the list private, other wise Java users can access it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134119014
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -162,13 +172,66 @@ class AggregationCodeGenerator(
   }
 }
 
+def genDataViewFieldSetter(accTerm: String, specs: 
Seq[DataViewSpec[_]]): String = {
+  if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) {
+val setters = for (spec <- specs) yield {
+  val field = spec.field
+  val dataViewTerm = s"${accTerm}_${field.getName}_dataview"
--- End diff --

I find that the dataview term is defined in many place, can we create a 
method to generate the term name? such as `createDataViewTerm(index: Int, 
fieldName: String)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118851
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
--- End diff --

Please make the elementTypeInfo as `@transient`, and do we want the type 
info to 
 be accessed by users? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-21 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118744
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.MapViewTypeInfoFactory
+
+/**
+  * MapView provides Map functionality for accumulators used by 
user-defined aggregate functions
+  * [[org.apache.flink.table.functions.AggregateFunction]].
+  *
+  * A MapView can be backed by a Java HashMap or a state backend, 
depending on the context in
+  * which the function is used.
+  *
+  * At runtime `MapView` will be replaced by a 
[[org.apache.flink.table.dataview.StateMapView]]
+  * when use state backend.
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public MapView map;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *@Override
+  *public MyAccum createAccumulator() {
+  *  MyAccum accum = new MyAccum();
+  *  accum.map = new MapView<>(Types.STRING, Types.INT);
+  *  accum.count = 0L;
+  *  return accum;
+  *}
+  *
+  *public void accumulate(MyAccum accumulator, String id) {
+  *  try {
+  *  if (!accumulator.map.contains(id)) {
+  *accumulator.map.put(id, 1);
+  *accumulator.count++;
+  *  }
+  *  } catch (Exception e) {
+  *e.printStackTrace();
+  *  }
+  *}
+  *
+  *@Override
+  *public Long getValue(MyAccum accumulator) {
+  *  return accumulator.count;
+  *}
+  *  }
+  *
+  * }}}
+  *
+  * @param keyTypeInfo key type information
+  * @param valueTypeInfo value type information
+  * @tparam K key type
+  * @tparam V value type
+  */
+@TypeInfo(classOf[MapViewTypeInfoFactory[_, _]])
+class MapView[K, V](
+   private[flink] val keyTypeInfo: TypeInformation[K],
+   private[flink] val valueTypeInfo: TypeInformation[V])
+  extends DataView {
+
+  def this() = this(null, null)
+
+  val map = new util.HashMap[K, V]()
--- End diff --

please make the `map` private


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118240
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -108,30 +112,38 @@ object AggregateUtil {
   outputArity,
   needRetract = false,
   needMerge = false,
-  needReset = false
+  needReset = false,
+  accConfig = Some(DataViewConfig(accSpecs, isUseState))
 )
 
+val accConfig = accSpecs
+  .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor)))
+  .toMap[String, StateDescriptor[_, _]]
+
 if (isRowTimeType) {
   if (isRowsClause) {
 // ROWS unbounded over process function
 new RowTimeUnboundedRowsOver(
   genFunction,
   aggregationStateType,
   CRowTypeInfo(inputTypeInfo),
-  queryConfig)
+  queryConfig,
+  accConfig)
--- End diff --

+1 to do this. A great improvement! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r134118203
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
+for (i <- 0 until fields.size()) {
+  val field = fields.get(i)
+  field.setAccessible(true)
+  if (classOf[DataView].isAssignableFrom(field.getType)) {
+if (field.getType == classOf[MapView[_, _]]) {
+  val mapView = field.get(acc)
+  val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, 
"keyTypeInfo")
--- End diff --

Add `private[flink]` to key and value type infos will make them public in 
Java. I'm not sure whether it is a good idea. I would like to not expose them 
to users (Java users), and the reflection only happens in compile which is fine 
I think. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-18 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r133925176
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -179,13 +214,19 @@ class AggregationCodeGenerator(
|  ${aggMapping(i)},
|  (${accTypes(i)}) accs.getField($i));""".stripMargin
   } else {
+val setDataView = if (accConfig.isDefined && 
accConfig.get.isUseState) {
--- End diff --

Yes, it make sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-18 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r133925097
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -405,6 +481,17 @@ class AggregationCodeGenerator(
   }
 }
 
+val aggFuncCode = Seq(
+  genSetAggregationResults,
+  genAccumulate,
+  genRetract,
+  genCreateAccumulators,
+  genSetForwardedFields,
+  genSetConstantFlags,
+  genCreateOutputRow,
+  genMergeAccumulatorsPair,
+  genResetAccumulator).mkString("\n")
--- End diff --

It make sense,I have look at ProcessFunctionWithCleanupState,the 
cleanUp should be called whenever cleanupState is called.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-18 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r133924725
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,172 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
--- End diff --

Yes, CountDistinct is just used for test case here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-01 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130655818
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
+
+  def this() = this(null)
+
+  /**
+* Returns an iterable of the list.
+*
+* @return The iterable of the list or { @code null} if the list is 
empty.
+*/
+  def get: JIterable[T] = throw new 
UnsupportedOperationException("Unsupported operation!")
--- End diff --

We try to implement public interfaces in a way that Java users do not need 
to deal with Scala classes. However, it should be fine in this case because 

1. Scala's `UnsupportedOperationException` is defined as Java's 
`UnsupportedOperationException` 
2. the exception will be removed if we implement the method by default as 
backed by an ArrayList.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-01 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130655010
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
--- End diff --

Hmm, not sure how many users would actually look into the definition of 
`ListView` and also recognize the interface as a state interface. Such users 
would probably also see the Java/ScalaDocs that explain the purpose of the 
class.

Anyway, I have no strong opinion about this and I'm OK to keep it as it is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-01 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130653790
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
--- End diff --

OK, then let's keep it as it is


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-01 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130628208
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
+
+  def this() = this(null)
+
+  /**
+* Returns an iterable of the list.
+*
+* @return The iterable of the list or { @code null} if the list is 
empty.
+*/
+  def get: JIterable[T] = throw new 
UnsupportedOperationException("Unsupported operation!")
+
+  /**
+* Adding the given value to the list.
+*
+* @param value element to be appended to this list
+*/
+  def add(value: T): Unit = throw new 
UnsupportedOperationException("Unsupported operation!")
--- End diff --

I think its a good idea and I will try it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-01 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130625494
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -307,6 +312,119 @@ object UserDefinedFunctionUtils {
   // 
--
 
   /**
+* get data view type information from accumulator constructor.
+*
+* @param aggFun aggregate function
+* @return the data view specification
+*/
+  def getDataViewTypeInfoFromConstructor(
+aggFun: AggregateFunction[_, _])
+  : mutable.HashMap[String, TypeInformation[_]] = {
+
+val resultMap = new mutable.HashMap[String, TypeInformation[_]]
+val acc = aggFun.createAccumulator()
+val fields: util.List[Field] = 
TypeExtractor.getAllDeclaredFields(acc.getClass, true)
+for (i <- 0 until fields.size()) {
+  val field = fields.get(i)
+  field.setAccessible(true)
+  if (classOf[DataView].isAssignableFrom(field.getType)) {
+if (field.getType == classOf[MapView[_, _]]) {
+  val mapView = field.get(acc)
+  val keyTypeInfo = getFieldValue(classOf[MapView[_, _]], mapView, 
"keyTypeInfo")
--- End diff --

yes,its a good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-01 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130612699
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
+
+  def this() = this(null)
+
+  /**
+* Returns an iterable of the list.
+*
+* @return The iterable of the list or { @code null} if the list is 
empty.
+*/
+  def get: JIterable[T] = throw new 
UnsupportedOperationException("Unsupported operation!")
--- End diff --

There is no reference to the java.lang package, so it is a scala 
UnsupportedOperationException, And what's the difference?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-01 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130611417
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
+class ListView[T](val elementTypeInfo: TypeInformation[T]) extends 
DataView {
--- End diff --

IMO, this will make ListView more like a state interface. Although the 
interface of ListView is same as ListState, we do not want the user to think 
that it is state. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-01 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130606340
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
+  *
+  * All methods in this class are not implemented, users do not need to 
care about whether it is
+  * backed by Java ArrayList or state backend. It will be replaced by a 
{@link StateListView} or a
+  * {@link HeapListView}.
+  *
+  * 
+  * NOTE: Users are not recommended to extends this class.
+  * 
+  *
+  * Example:
+  * {{{
+  *
+  *  public class MyAccum {
+  *public ListView list;
+  *public long count;
+  *  }
+  *
+  *  public class MyAgg extends AggregateFunction {
+  *
+  *   @Override
+  *   public MyAccum createAccumulator() {
+  * MyAccum accum = new MyAccum();
+  * accum.list = new ListView<>(Types.STRING);
+  * accum.count = 0L;
+  * return accum;
+  *   }
+  *
+  *   //Overloaded accumulate method
+  *   public void accumulate(MyAccum accumulator, String id) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  *   }
+  *
+  *   @Override
+  *   public Long getValue(MyAccum accumulator) {
+  * accumulator.list.add(id);
+  * ... ...
+  * accumulator.get()
+  * ... ...
+  * return accumulator.count;
+  *   }
+  * }
+  *
+  * }}}
+  *
+  * @param elementTypeInfo element type information
+  * @tparam T element type
+  */
+@TypeInfo(classOf[ListViewTypeInfoFactory[_]])
--- End diff --

I tried and it will compile with error "not found: type T"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-31 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130461621
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -405,6 +481,17 @@ class AggregationCodeGenerator(
   }
 }
 
+val aggFuncCode = Seq(
+  genSetAggregationResults,
+  genAccumulate,
+  genRetract,
+  genCreateAccumulators,
+  genSetForwardedFields,
+  genSetConstantFlags,
+  genCreateOutputRow,
+  genMergeAccumulatorsPair,
+  genResetAccumulator).mkString("\n")
--- End diff --

I think we need an additional `cleanUp()` method that clears all state 
objects for the current key. Otherwise, we will have memory leaks. The 
`cleanUp()` method must be called when the state retention timers trigger.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-31 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130459674
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -222,14 +271,22 @@ class AggregationCodeGenerator(
 j"""
|  public final void retract(
|org.apache.flink.types.Row accs,
-   |org.apache.flink.types.Row input)""".stripMargin
+   |org.apache.flink.types.Row input) throws Exception 
""".stripMargin
 
   val retract: String = {
-for (i <- aggs.indices) yield
+for (i <- aggs.indices) yield {
+  val setDataView = if (accConfig.isDefined && 
accConfig.get.isUseState) {
+genDataViewFieldSetter(s"acc$i", accConfig.get.accSpecs(i))
--- End diff --

Same as for `setAggregationResults()`. I think we should reuse the state 
objects.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-31 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130458506
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -179,13 +214,19 @@ class AggregationCodeGenerator(
|  ${aggMapping(i)},
|  (${accTypes(i)}) accs.getField($i));""".stripMargin
   } else {
+val setDataView = if (accConfig.isDefined && 
accConfig.get.isUseState) {
--- End diff --

This will create new `MapState` (or `ListState`) objects in every 
invocation of `setAggregationResults()`. I think we can make the state objects 
members of the `GeneratedAggregations` class and just set them into the 
accumulator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-31 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130461181
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -386,13 +455,20 @@ class AggregationCodeGenerator(
   val sig: String =
 j"""
|  public final void resetAccumulator(
-   |org.apache.flink.types.Row accs)""".stripMargin
+   |org.apache.flink.types.Row accs) throws Exception 
""".stripMargin
 
   val reset: String = {
-for (i <- aggs.indices) yield
+for (i <- aggs.indices) yield {
+  val setDataView = if (accConfig.isDefined && 
accConfig.get.isUseState) {
+genDataViewFieldSetter(s"acc$i", accConfig.get.accSpecs(i))
--- End diff --

Should reuse the state objects.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-31 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130459631
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -201,14 +242,22 @@ class AggregationCodeGenerator(
 j"""
|  public final void accumulate(
|org.apache.flink.types.Row accs,
-   |org.apache.flink.types.Row input)""".stripMargin
+   |org.apache.flink.types.Row input) throws Exception 
""".stripMargin
 
   val accumulate: String = {
-for (i <- aggs.indices) yield
+for (i <- aggs.indices) yield {
+  val setDataView = if (accConfig.isDefined && 
accConfig.get.isUseState) {
+genDataViewFieldSetter(s"acc$i", accConfig.get.accSpecs(i))
--- End diff --

Same as for `setAggregationResults()`. I think we should reuse the state 
objects.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130053061
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.dataview
+
+import java.lang.{Iterable => JIterable}
+
+import org.apache.flink.api.common.typeinfo.{TypeInfo, TypeInformation}
+import org.apache.flink.table.dataview.ListViewTypeInfoFactory
+
+/**
+  * ListView encapsulates the operation of list.
--- End diff --

```
ListView provides List functionality for accumulators used by user-defined 
aggregate functions {{AggregateFunction}}. 
A ListView can be backed by a Java ArrayList or a state backend, depending 
on the context in which the function is used. 

At runtime `ListView` will be replaced by a {@link StateListView} or a 
{@link HeapListView}. 
Hence, the `ListView's` method do not need to be implemented.
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-07-28 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r130119984
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -19,13 +19,20 @@
 package org.apache.flink.table.runtime.aggregate
 
 import org.apache.flink.api.common.functions.Function
+import org.apache.flink.table.dataview.{DataViewFactory, HeapViewFactory}
 import org.apache.flink.types.Row
 
 /**
   * Base class for code-generated aggregations.
   */
 abstract class GeneratedAggregations extends Function {
 
+  var factory: DataViewFactory = new HeapViewFactory()
+
+  def getDataViewFactory: DataViewFactory = factory
+
+  def setDataViewFactory(factory: DataViewFactory): Unit = this.factory = 
factory
--- End diff --

I think we should rather add a method `initialize(ctx: RuntimeContext)` and 
generate code to register the state in this method. 

IMO, the `DataViewFactory` is also not required, because 
1. we can code-gen all of that functionality
2. we can make heap the default for `MapView` and `ListView` such that we 
only need to replace it if it needs to be backed by state. So there would be 
only one implementation of `DataViewFactory`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >