[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction

2018-12-19 Thread GitBox
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] 
Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
URL: https://github.com/apache/flink/pull/7237#discussion_r243154482
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/NullSerializer.scala
 ##
 @@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+
+class NullSerializer extends TypeSerializerSingleton[Any] {
 
 Review comment:
   hmm. I think that's a good idea but should probably be addressed in another 
JIRA. I actually have [PR:6472](https://github.com/apache/flink/pull/6472) 
still pending which I am also very fond of the idea to add test for the Utility 
class. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction

2018-12-19 Thread GitBox
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] 
Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
URL: https://github.com/apache/flink/pull/7237#discussion_r243014619
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ##
 @@ -489,6 +491,88 @@ object UserDefinedFunctionUtils {
   )
 }
 
+def decorateDataViewTypeInfo(
+fieldTypeInfo: TypeInformation[_],
+fieldInstance: AnyRef,
+field: Field): (TypeInformation[_], Option[DataViewSpec[_]]) = 
fieldTypeInfo match {
+  case ct: CompositeType[_] if includesDataView(ct) =>
+throw new TableException(
+  "MapView and ListView only supported at first level of accumulators 
of Pojo, Tuple " +
+"and Case Class type.")
+  case map: MapViewTypeInfo[_, _] =>
+val mapView = fieldInstance.asInstanceOf[MapView[_, _]]
+val newTypeInfo = if (mapView != null && mapView.keyTypeInfo != null &&
+  mapView.valueTypeInfo != null) {
+  new MapViewTypeInfo(mapView.keyTypeInfo, mapView.valueTypeInfo)
+} else {
+  map
+}
+
+if (isStateBackedDataViews) {
+  newTypeInfo.nullSerializer = true
+
+  // create map view specs with unique id (used as state name)
+  val fieldName = field.getName
+  var spec = MapViewSpec(
+"agg" + index + "$" + fieldName,
+field,
+newTypeInfo)
+
+  (newTypeInfo, Some(spec))
+} else {
+  (newTypeInfo, None)
+}
+
+  case list: ListViewTypeInfo[_] =>
 
 Review comment:
   Sorry I was not making it clear. I meant maybe we can at a `Tuple` test case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction

2018-12-19 Thread GitBox
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] 
Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
URL: https://github.com/apache/flink/pull/7237#discussion_r243015377
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/NullSerializer.scala
 ##
 @@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+
+class NullSerializer extends TypeSerializerSingleton[Any] {
 
 Review comment:
   Yeah. I think I also trace the same code path for Pojo. However, removing 
the `NullSerializer` still lets the newly modified 
`testGroupAggregateWithStateBackend` passes. this is worrying because 
essentially there's no test safeguard for this piece of code being removed in 
the future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction

2018-12-17 Thread GitBox
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] 
Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
URL: https://github.com/apache/flink/pull/7237#discussion_r242392587
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ##
 @@ -489,6 +491,88 @@ object UserDefinedFunctionUtils {
   )
 }
 
+def decorateDataViewTypeInfo(
+fieldTypeInfo: TypeInformation[_],
+fieldInstance: AnyRef,
+field: Field): (TypeInformation[_], Option[DataViewSpec[_]]) = 
fieldTypeInfo match {
+  case ct: CompositeType[_] if includesDataView(ct) =>
+throw new TableException(
+  "MapView and ListView only supported at first level of accumulators 
of Pojo, Tuple " +
+"and Case Class type.")
+  case map: MapViewTypeInfo[_, _] =>
+val mapView = fieldInstance.asInstanceOf[MapView[_, _]]
+val newTypeInfo = if (mapView != null && mapView.keyTypeInfo != null &&
+  mapView.valueTypeInfo != null) {
+  new MapViewTypeInfo(mapView.keyTypeInfo, mapView.valueTypeInfo)
+} else {
+  map
+}
+
+if (isStateBackedDataViews) {
+  newTypeInfo.nullSerializer = true
+
+  // create map view specs with unique id (used as state name)
+  val fieldName = field.getName
+  var spec = MapViewSpec(
+"agg" + index + "$" + fieldName,
+field,
+newTypeInfo)
+
+  (newTypeInfo, Some(spec))
+} else {
+  (newTypeInfo, None)
+}
+
+  case list: ListViewTypeInfo[_] =>
 
 Review comment:
   Maybe add a ListView test case?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction

2018-12-17 Thread GitBox
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] 
Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
URL: https://github.com/apache/flink/pull/7237#discussion_r242392503
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ##
 @@ -489,6 +491,88 @@ object UserDefinedFunctionUtils {
   )
 }
 
+def decorateDataViewTypeInfo(
+fieldTypeInfo: TypeInformation[_],
+fieldInstance: AnyRef,
+field: Field): (TypeInformation[_], Option[DataViewSpec[_]]) = 
fieldTypeInfo match {
+  case ct: CompositeType[_] if includesDataView(ct) =>
+throw new TableException(
+  "MapView and ListView only supported at first level of accumulators 
of Pojo, Tuple " +
+"and Case Class type.")
+  case map: MapViewTypeInfo[_, _] =>
+val mapView = fieldInstance.asInstanceOf[MapView[_, _]]
+val newTypeInfo = if (mapView != null && mapView.keyTypeInfo != null &&
+  mapView.valueTypeInfo != null) {
+  new MapViewTypeInfo(mapView.keyTypeInfo, mapView.valueTypeInfo)
+} else {
+  map
+}
+
+if (isStateBackedDataViews) {
 
 Review comment:
   I was wondering if this is ever needed. since the test case added in this PR 
doesn't cover this code path on the else case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction

2018-12-17 Thread GitBox
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] 
Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
URL: https://github.com/apache/flink/pull/7237#discussion_r242393559
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/NullSerializer.scala
 ##
 @@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+
+class NullSerializer extends TypeSerializerSingleton[Any] {
 
 Review comment:
   I am assuming `NullSerializer` is needed because the actual map is actually 
handled by StateBackend and because we removed them through 
`removeStateViewFieldsFromAccTypeInfo`. I was wondering why is this needed 
since it was not introduced in previous approach. Neither was it reflected in 
the test case ( still passes without the nullserializer setting)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services