[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction URL: https://github.com/apache/flink/pull/7237#discussion_r243154482 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/NullSerializer.scala ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton +import org.apache.flink.core.memory.{DataInputView, DataOutputView} + +class NullSerializer extends TypeSerializerSingleton[Any] { Review comment: hmm. I think that's a good idea but should probably be addressed in another JIRA. I actually have [PR:6472](https://github.com/apache/flink/pull/6472) still pending which I am also very fond of the idea to add test for the Utility class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction URL: https://github.com/apache/flink/pull/7237#discussion_r243014619 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ## @@ -489,6 +491,88 @@ object UserDefinedFunctionUtils { ) } +def decorateDataViewTypeInfo( +fieldTypeInfo: TypeInformation[_], +fieldInstance: AnyRef, +field: Field): (TypeInformation[_], Option[DataViewSpec[_]]) = fieldTypeInfo match { + case ct: CompositeType[_] if includesDataView(ct) => +throw new TableException( + "MapView and ListView only supported at first level of accumulators of Pojo, Tuple " + +"and Case Class type.") + case map: MapViewTypeInfo[_, _] => +val mapView = fieldInstance.asInstanceOf[MapView[_, _]] +val newTypeInfo = if (mapView != null && mapView.keyTypeInfo != null && + mapView.valueTypeInfo != null) { + new MapViewTypeInfo(mapView.keyTypeInfo, mapView.valueTypeInfo) +} else { + map +} + +if (isStateBackedDataViews) { + newTypeInfo.nullSerializer = true + + // create map view specs with unique id (used as state name) + val fieldName = field.getName + var spec = MapViewSpec( +"agg" + index + "$" + fieldName, +field, +newTypeInfo) + + (newTypeInfo, Some(spec)) +} else { + (newTypeInfo, None) +} + + case list: ListViewTypeInfo[_] => Review comment: Sorry I was not making it clear. I meant maybe we can at a `Tuple` test case. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction URL: https://github.com/apache/flink/pull/7237#discussion_r243015377 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/NullSerializer.scala ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton +import org.apache.flink.core.memory.{DataInputView, DataOutputView} + +class NullSerializer extends TypeSerializerSingleton[Any] { Review comment: Yeah. I think I also trace the same code path for Pojo. However, removing the `NullSerializer` still lets the newly modified `testGroupAggregateWithStateBackend` passes. this is worrying because essentially there's no test safeguard for this piece of code being removed in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction URL: https://github.com/apache/flink/pull/7237#discussion_r242392587 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ## @@ -489,6 +491,88 @@ object UserDefinedFunctionUtils { ) } +def decorateDataViewTypeInfo( +fieldTypeInfo: TypeInformation[_], +fieldInstance: AnyRef, +field: Field): (TypeInformation[_], Option[DataViewSpec[_]]) = fieldTypeInfo match { + case ct: CompositeType[_] if includesDataView(ct) => +throw new TableException( + "MapView and ListView only supported at first level of accumulators of Pojo, Tuple " + +"and Case Class type.") + case map: MapViewTypeInfo[_, _] => +val mapView = fieldInstance.asInstanceOf[MapView[_, _]] +val newTypeInfo = if (mapView != null && mapView.keyTypeInfo != null && + mapView.valueTypeInfo != null) { + new MapViewTypeInfo(mapView.keyTypeInfo, mapView.valueTypeInfo) +} else { + map +} + +if (isStateBackedDataViews) { + newTypeInfo.nullSerializer = true + + // create map view specs with unique id (used as state name) + val fieldName = field.getName + var spec = MapViewSpec( +"agg" + index + "$" + fieldName, +field, +newTypeInfo) + + (newTypeInfo, Some(spec)) +} else { + (newTypeInfo, None) +} + + case list: ListViewTypeInfo[_] => Review comment: Maybe add a ListView test case? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction URL: https://github.com/apache/flink/pull/7237#discussion_r242392503 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ## @@ -489,6 +491,88 @@ object UserDefinedFunctionUtils { ) } +def decorateDataViewTypeInfo( +fieldTypeInfo: TypeInformation[_], +fieldInstance: AnyRef, +field: Field): (TypeInformation[_], Option[DataViewSpec[_]]) = fieldTypeInfo match { + case ct: CompositeType[_] if includesDataView(ct) => +throw new TableException( + "MapView and ListView only supported at first level of accumulators of Pojo, Tuple " + +"and Case Class type.") + case map: MapViewTypeInfo[_, _] => +val mapView = fieldInstance.asInstanceOf[MapView[_, _]] +val newTypeInfo = if (mapView != null && mapView.keyTypeInfo != null && + mapView.valueTypeInfo != null) { + new MapViewTypeInfo(mapView.keyTypeInfo, mapView.valueTypeInfo) +} else { + map +} + +if (isStateBackedDataViews) { Review comment: I was wondering if this is ever needed. since the test case added in this PR doesn't cover this code path on the else case. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction URL: https://github.com/apache/flink/pull/7237#discussion_r242393559 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/NullSerializer.scala ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton +import org.apache.flink.core.memory.{DataInputView, DataOutputView} + +class NullSerializer extends TypeSerializerSingleton[Any] { Review comment: I am assuming `NullSerializer` is needed because the actual map is actually handled by StateBackend and because we removed them through `removeStateViewFieldsFromAccTypeInfo`. I was wondering why is this needed since it was not introduced in previous approach. Neither was it reflected in the test case ( still passes without the nullserializer setting) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services