[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r105731937 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala --- @@ -199,10 +200,21 @@ class DataSetJoin( val joinOpName = s"where: ($joinConditionToString), join: ($joinSelectionToString)" +//consider all fields not which are not keys are forwarded +val leftIndices = (0 until left.getRowType.getFieldCount).diff(leftKeys) --- End diff -- A Calcite join forwards all fields of both sides. If the left input is `(l1, l2, l3)` and the right input is `(r1, r2)`, then the result of the join will be `(l1, l2, l3, r1, r2)` for all pairs of left and right that satisfy the join condition. It does not matter which of the fields is a key field. If the join condition is `l1 == r2`, both fields are forwarded. Since DataSet joins organize the input data sets based on the key attributes (partition and sort) this attributes are especially interesting for forward field annotations. Actually, I just noticed that we have to distinguish the type of the join (inner, left, right, full). We can only forward the fields of the inner side (both for inner join, left for left join, right for right join, none for full outer join) because the outer side might have been padded with `null` values. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r105639200 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.dataset.forwarding + +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, TypeInformation => TypeInfo} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.DataSet +import org.apache.flink.table.api.TableException +import org.apache.flink.types.Row + +object FieldForwardingUtils { + + def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields + + private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = { +throw new TableException(s"Implementation for $customWrapper wrapper is missing.") + } + + /** +* Wrapper for {@link getForwardedFields} +*/ + def getForwardedInput( + inputType: TypeInfo[_], + outputType: TypeInfo[_], + forwardIndices: Seq[Int], + customWrapper: TypeInfo[_] => +Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String = { + +getForwardedFields(inputType, + outputType, + forwardIndices.zip(forwardIndices), + customWrapper) + } + + /** +* Wraps provided indices with proper names. +* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row +* +* @param inputType information of input data +* @param outputType information of output data +* @param forwardIndices tuple of (input, output) indices of a forwarded field +* @param customWrapper used for figuring out proper type in specific cases, +* e.g. {@see DataSetSingleRowJoin} +* @return string with forwarded fields mapped from input to output +*/ + def getForwardedFields( + inputType: TypeInfo[_], + outputType: TypeInfo[_], + forwardIndices: Seq[(Int, Int)], + customWrapper: TypeInfo[_] => +Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String = { + +def chooseWrapper( +typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = { + + typeInformation match { +case composite: CompositeType[_] => + val fields = extractFields(composite) + compositeTypeField(fields) +case basic: BasicTypeInfo[_] => + Seq((s"*", basic)) +case array: BasicArrayTypeInfo[_, _] => + Seq((s"*", array)) +case _ => + customWrapper(typeInformation) + } +} + +val wrapInput = chooseWrapper(inputType) +val wrapOutput = chooseWrapper(outputType) + +forwardFields(forwardIndices, wrapInput, wrapOutput) + } + + private def extractFields( + composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = { + +val types = for { + i <- 0 until composite.getArity +} yield { composite.getTypeAt(i) } + +composite.getFieldNames.zip(types) + } + + private def forwardFields( + forwardIndices: Seq[(Int, Int)], + wrappedInput: Int => (String, TypeInfo[_]), + wrappedOutput: Int => (String, TypeInfo[_])): String = { + +implicit class Field2ForwardField(left: (String, TypeInfo[_])) { + def ->(right: (String, TypeInfo[_])): String = if (left.equals(right)) { +s"${left._1}" + } else { +if (left._2.equals(right._2)) { + s"${left._1}->${right._1}" +} else { + null --- End diff -- I think this case should never happen if correct mappings are provided. If the types do not match, we have a bug the logic to
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r105600606 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.dataset.forwarding + +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, TypeInformation => TypeInfo} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.DataSet +import org.apache.flink.table.api.TableException +import org.apache.flink.types.Row + +object FieldForwardingUtils { + + def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields + + private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = { +throw new TableException(s"Implementation for $customWrapper wrapper is missing.") + } + + /** +* Wrapper for {@link getForwardedFields} +*/ + def getForwardedInput( + inputType: TypeInfo[_], + outputType: TypeInfo[_], + forwardIndices: Seq[Int], + customWrapper: TypeInfo[_] => +Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String = { + +getForwardedFields(inputType, + outputType, + forwardIndices.zip(forwardIndices), + customWrapper) + } + + /** +* Wraps provided indices with proper names. +* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row +* +* @param inputType information of input data +* @param outputType information of output data +* @param forwardIndices tuple of (input, output) indices of a forwarded field +* @param customWrapper used for figuring out proper type in specific cases, +* e.g. {@see DataSetSingleRowJoin} +* @return string with forwarded fields mapped from input to output +*/ + def getForwardedFields( + inputType: TypeInfo[_], + outputType: TypeInfo[_], + forwardIndices: Seq[(Int, Int)], + customWrapper: TypeInfo[_] => +Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String = { + +def chooseWrapper( +typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = { + + typeInformation match { +case composite: CompositeType[_] => + val fields = extractFields(composite) + compositeTypeField(fields) +case basic: BasicTypeInfo[_] => + Seq((s"*", basic)) +case array: BasicArrayTypeInfo[_, _] => + Seq((s"*", array)) +case _ => + customWrapper(typeInformation) + } +} + +val wrapInput = chooseWrapper(inputType) +val wrapOutput = chooseWrapper(outputType) + +forwardFields(forwardIndices, wrapInput, wrapOutput) + } + + private def extractFields( + composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = { + +val types = for { + i <- 0 until composite.getArity +} yield { composite.getTypeAt(i) } + +composite.getFieldNames.zip(types) + } + + private def forwardFields( + forwardIndices: Seq[(Int, Int)], + wrappedInput: Int => (String, TypeInfo[_]), + wrappedOutput: Int => (String, TypeInfo[_])): String = { + +implicit class Field2ForwardField(left: (String, TypeInfo[_])) { + def ->(right: (String, TypeInfo[_])): String = if (left.equals(right)) { +s"${left._1}" + } else { +if (left._2.equals(right._2)) { + s"${left._1}->${right._1}" +} else { + null --- End diff -- @fhueske I think if types don't match we can just skip it --- If your project is set up for it, you can reply to this email
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r104730718 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala --- @@ -124,8 +127,26 @@ class DataSetCalc( body, returnType) +def getForwardIndices = { --- End diff -- move all forward fields related code into a function `getForwardFields()` to keep the main translation function more concise --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r105014912 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala --- @@ -125,11 +126,25 @@ class DataSetAggregate( .name(aggOpName) } else { + //Forward all fields at conversion + val inputInfo = mappedInput.getType --- End diff -- PR #3472 will remove the preparing mapper but add a `GroupCombineFunction`. For both, the combiner and the reducer, we need to forward the fields on which is grouped because these are not modified but just forwarded. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r104708548 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala --- @@ -125,11 +126,25 @@ class DataSetAggregate( .name(aggOpName) } else { + //Forward all fields at conversion + val inputInfo = mappedInput.getType --- End diff -- An aggregation operator returns the grouping keys and aggregated values. Only the grouping keys are not modified and forwarded. Forwarded fields can only be assigned for grouped aggregates. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r105029771 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.dataset.forwarding + +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, TypeInformation => TypeInfo} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.DataSet +import org.apache.flink.table.api.TableException +import org.apache.flink.types.Row + +object FieldForwardingUtils { + + def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields + + private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = { +throw new TableException(s"Implementation for $customWrapper wrapper is missing.") + } + + /** +* Wrapper for {@link getForwardedFields} +*/ + def getForwardedInput( + inputType: TypeInfo[_], + outputType: TypeInfo[_], + forwardIndices: Seq[Int], + customWrapper: TypeInfo[_] => +Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String = { + +getForwardedFields(inputType, + outputType, + forwardIndices.zip(forwardIndices), + customWrapper) + } + + /** +* Wraps provided indices with proper names. +* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row +* +* @param inputType information of input data +* @param outputType information of output data +* @param forwardIndices tuple of (input, output) indices of a forwarded field +* @param customWrapper used for figuring out proper type in specific cases, +* e.g. {@see DataSetSingleRowJoin} +* @return string with forwarded fields mapped from input to output +*/ + def getForwardedFields( + inputType: TypeInfo[_], + outputType: TypeInfo[_], + forwardIndices: Seq[(Int, Int)], + customWrapper: TypeInfo[_] => --- End diff -- I didn't notice that the `customWrapper` was used. Can it be removed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r105030757 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.dataset.forwarding + +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, TypeInformation => TypeInfo} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.DataSet +import org.apache.flink.table.api.TableException +import org.apache.flink.types.Row + +object FieldForwardingUtils { + + def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields + + private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = { +throw new TableException(s"Implementation for $customWrapper wrapper is missing.") + } + + /** +* Wrapper for {@link getForwardedFields} +*/ + def getForwardedInput( + inputType: TypeInfo[_], + outputType: TypeInfo[_], + forwardIndices: Seq[Int], + customWrapper: TypeInfo[_] => +Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String = { + +getForwardedFields(inputType, + outputType, + forwardIndices.zip(forwardIndices), + customWrapper) + } + + /** +* Wraps provided indices with proper names. +* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row +* +* @param inputType information of input data +* @param outputType information of output data +* @param forwardIndices tuple of (input, output) indices of a forwarded field +* @param customWrapper used for figuring out proper type in specific cases, +* e.g. {@see DataSetSingleRowJoin} +* @return string with forwarded fields mapped from input to output +*/ + def getForwardedFields( + inputType: TypeInfo[_], + outputType: TypeInfo[_], + forwardIndices: Seq[(Int, Int)], + customWrapper: TypeInfo[_] => +Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String = { + +def chooseWrapper( +typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = { + + typeInformation match { +case composite: CompositeType[_] => + val fields = extractFields(composite) + compositeTypeField(fields) +case basic: BasicTypeInfo[_] => + Seq((s"*", basic)) +case array: BasicArrayTypeInfo[_, _] => + Seq((s"*", array)) +case _ => --- End diff -- `AtomicType` can be `"*"` as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r104685724 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala --- @@ -46,6 +46,7 @@ trait CommonCorrelate { config: TableConfig, inputTypeInfo: TypeInformation[Row], udtfTypeInfo: TypeInformation[Any], + returnType: TypeInformation[Row], --- End diff -- why is this change necessary? `returnType` can be computed from `rowType` which is a parameter as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r105029829 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.dataset.forwarding + +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, TypeInformation => TypeInfo} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.DataSet +import org.apache.flink.table.api.TableException +import org.apache.flink.types.Row + +object FieldForwardingUtils { + + def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields + + private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = { +throw new TableException(s"Implementation for $customWrapper wrapper is missing.") + } + + /** +* Wrapper for {@link getForwardedFields} +*/ + def getForwardedInput( + inputType: TypeInfo[_], + outputType: TypeInfo[_], + forwardIndices: Seq[Int], + customWrapper: TypeInfo[_] => --- End diff -- Can the `customWrapper` be removed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r104723484 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala --- @@ -97,18 +103,41 @@ class DataSetCorrelate( val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] val pojoFieldMapping = sqlFunction.getPojoFieldMapping val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]] +val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) val mapFunc = correlateMapFunction( config, inputDS.getType, udtfTypeInfo, + returnType, getRowType, joinType, rexCall, condition, Some(pojoFieldMapping), ruleDescription) -inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType)) +def getIndices = { --- End diff -- A correlate forwards all fields from the input and the table function like this `[in1, in2, in3, tf1, tf2]` for an input `[in1, in2, in3]` and table function `[tf1, tf2]`. So we can do a simple position based mapping of the fields of the input type against the output type (field names might change). Basically similar to what you are doing with the single row join. We do not need to look at the table function or the condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r104706099 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.dataset.forwarding + +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, TypeInformation => TypeInfo} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.DataSet +import org.apache.flink.table.api.TableException +import org.apache.flink.types.Row + +object FieldForwardingUtils { + + def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields + + private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = { +throw new TableException(s"Implementation for $customWrapper wrapper is missing.") + } + + /** +* Wrapper for {@link getForwardedFields} +*/ + def getForwardedInput( + inputType: TypeInfo[_], + outputType: TypeInfo[_], + forwardIndices: Seq[Int], + customWrapper: TypeInfo[_] => +Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String = { + +getForwardedFields(inputType, + outputType, + forwardIndices.zip(forwardIndices), + customWrapper) + } + + /** +* Wraps provided indices with proper names. +* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row +* +* @param inputType information of input data +* @param outputType information of output data +* @param forwardIndices tuple of (input, output) indices of a forwarded field +* @param customWrapper used for figuring out proper type in specific cases, +* e.g. {@see DataSetSingleRowJoin} +* @return string with forwarded fields mapped from input to output +*/ + def getForwardedFields( + inputType: TypeInfo[_], + outputType: TypeInfo[_], + forwardIndices: Seq[(Int, Int)], + customWrapper: TypeInfo[_] => +Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String = { + +def chooseWrapper( +typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = { + + typeInformation match { +case composite: CompositeType[_] => + val fields = extractFields(composite) + compositeTypeField(fields) +case basic: BasicTypeInfo[_] => + Seq((s"*", basic)) +case array: BasicArrayTypeInfo[_, _] => + Seq((s"*", array)) +case _ => + customWrapper(typeInformation) + } +} + +val wrapInput = chooseWrapper(inputType) +val wrapOutput = chooseWrapper(outputType) + +forwardFields(forwardIndices, wrapInput, wrapOutput) + } + + private def extractFields( + composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = { + +val types = for { + i <- 0 until composite.getArity +} yield { composite.getTypeAt(i) } + +composite.getFieldNames.zip(types) + } + + private def forwardFields( + forwardIndices: Seq[(Int, Int)], + wrappedInput: Int => (String, TypeInfo[_]), + wrappedOutput: Int => (String, TypeInfo[_])): String = { + +implicit class Field2ForwardField(left: (String, TypeInfo[_])) { + def ->(right: (String, TypeInfo[_])): String = if (left.equals(right)) { +s"${left._1}" + } else { +if (left._2.equals(right._2)) { + s"${left._1}->${right._1}" +} else { + null --- End diff -- We should throw an exception here. If the types do not match, the logic to identify the mapping of forwarded fields is broken.
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r105030435 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.dataset.forwarding + +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, TypeInformation => TypeInfo} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.DataSet +import org.apache.flink.table.api.TableException +import org.apache.flink.types.Row + +object FieldForwardingUtils { + + def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields + + private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = { +throw new TableException(s"Implementation for $customWrapper wrapper is missing.") + } + + /** +* Wrapper for {@link getForwardedFields} +*/ + def getForwardedInput( + inputType: TypeInfo[_], --- End diff -- Please document the parameters. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r105023568 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala --- @@ -124,8 +127,26 @@ class DataSetCalc( body, returnType) +def getForwardIndices = { + // get (input, output) indices of operands, + // filter modified operands and specify forwarding + val inputFields = extractRefInputFields(calcProgram) + calcProgram.getProjectList +.map(_.getIndex) +.zipWithIndex +.filter(tup => inputFields.contains(tup._1)) --- End diff -- This position based mapping is making some assumptions about the internal organization of a `RexProgram`, i.e., that the first `n` fields of the expressions list are filled in order by the `n` fields of the input. Can we change this to iterate over the projection list and looking up the expression and check whether it is a `RexInputRef`. Basically something like: ``` calcProgram.getProjectList.zipWithIndex.map { case (p, out) => val expr = calcProgram.getExprList.get(p.getIndex) expr match { case i: RexInputRef => Some((i.getIndex, out)) case _ => None } }.flatten ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r104729449 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.dataset.forwarding + +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, TypeInformation => TypeInfo} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.DataSet +import org.apache.flink.table.api.TableException +import org.apache.flink.types.Row + +object FieldForwardingUtils { + + def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields + + private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = { +throw new TableException(s"Implementation for $customWrapper wrapper is missing.") + } + + /** +* Wrapper for {@link getForwardedFields} +*/ + def getForwardedInput( + inputType: TypeInfo[_], + outputType: TypeInfo[_], + forwardIndices: Seq[Int], + customWrapper: TypeInfo[_] => +Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String = { + +getForwardedFields(inputType, + outputType, + forwardIndices.zip(forwardIndices), + customWrapper) + } + + /** +* Wraps provided indices with proper names. +* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row +* +* @param inputType information of input data +* @param outputType information of output data +* @param forwardIndices tuple of (input, output) indices of a forwarded field +* @param customWrapper used for figuring out proper type in specific cases, +* e.g. {@see DataSetSingleRowJoin} +* @return string with forwarded fields mapped from input to output +*/ + def getForwardedFields( + inputType: TypeInfo[_], + outputType: TypeInfo[_], + forwardIndices: Seq[(Int, Int)], + customWrapper: TypeInfo[_] => +Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String = { + +def chooseWrapper( +typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = { + + typeInformation match { +case composite: CompositeType[_] => + val fields = extractFields(composite) + compositeTypeField(fields) +case basic: BasicTypeInfo[_] => + Seq((s"*", basic)) +case array: BasicArrayTypeInfo[_, _] => + Seq((s"*", array)) +case _ => + customWrapper(typeInformation) + } +} + +val wrapInput = chooseWrapper(inputType) +val wrapOutput = chooseWrapper(outputType) + +forwardFields(forwardIndices, wrapInput, wrapOutput) + } + + private def extractFields( + composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = { + +val types = for { + i <- 0 until composite.getArity +} yield { composite.getTypeAt(i) } + +composite.getFieldNames.zip(types) + } + + private def forwardFields( + forwardIndices: Seq[(Int, Int)], + wrappedInput: Int => (String, TypeInfo[_]), + wrappedOutput: Int => (String, TypeInfo[_])): String = { + +implicit class Field2ForwardField(left: (String, TypeInfo[_])) { + def ->(right: (String, TypeInfo[_])): String = if (left.equals(right)) { +s"${left._1}" + } else { +if (left._2.equals(right._2)) { + s"${left._1}->${right._1}" +} else { + null +} + } +} + +forwardIndices map { + case (in, out) => +wrappedInput(in) -> wrappedOutput(out)
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r105032956 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.dataset.forwarding + +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, TypeInformation => TypeInfo} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.DataSet +import org.apache.flink.table.api.TableException +import org.apache.flink.types.Row + +object FieldForwardingUtils { + + def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields --- End diff -- This function can be removed, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r104730046 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.dataset.forwarding + +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, TypeInformation => TypeInfo} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.DataSet +import org.apache.flink.table.api.TableException +import org.apache.flink.types.Row + +object FieldForwardingUtils { + + def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields + + private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = { +throw new TableException(s"Implementation for $customWrapper wrapper is missing.") + } + + /** +* Wrapper for {@link getForwardedFields} +*/ + def getForwardedInput( + inputType: TypeInfo[_], + outputType: TypeInfo[_], + forwardIndices: Seq[Int], + customWrapper: TypeInfo[_] => +Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String = { + +getForwardedFields(inputType, + outputType, + forwardIndices.zip(forwardIndices), + customWrapper) + } + + /** +* Wraps provided indices with proper names. +* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row +* +* @param inputType information of input data +* @param outputType information of output data +* @param forwardIndices tuple of (input, output) indices of a forwarded field +* @param customWrapper used for figuring out proper type in specific cases, +* e.g. {@see DataSetSingleRowJoin} +* @return string with forwarded fields mapped from input to output +*/ + def getForwardedFields( + inputType: TypeInfo[_], + outputType: TypeInfo[_], + forwardIndices: Seq[(Int, Int)], + customWrapper: TypeInfo[_] => +Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String = { + +def chooseWrapper( +typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = { + + typeInformation match { +case composite: CompositeType[_] => + val fields = extractFields(composite) + compositeTypeField(fields) +case basic: BasicTypeInfo[_] => + Seq((s"*", basic)) +case array: BasicArrayTypeInfo[_, _] => + Seq((s"*", array)) +case _ => + customWrapper(typeInformation) + } +} + +val wrapInput = chooseWrapper(inputType) +val wrapOutput = chooseWrapper(outputType) + +forwardFields(forwardIndices, wrapInput, wrapOutput) + } + + private def extractFields( + composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = { + +val types = for { + i <- 0 until composite.getArity +} yield { composite.getTypeAt(i) } + +composite.getFieldNames.zip(types) + } + + private def forwardFields( + forwardIndices: Seq[(Int, Int)], + wrappedInput: Int => (String, TypeInfo[_]), + wrappedOutput: Int => (String, TypeInfo[_])): String = { + +implicit class Field2ForwardField(left: (String, TypeInfo[_])) { + def ->(right: (String, TypeInfo[_])): String = if (left.equals(right)) { +s"${left._1}" + } else { +if (left._2.equals(right._2)) { + s"${left._1}->${right._1}" +} else { + null +} + } +} + +forwardIndices map { + case (in, out) => +wrappedInput(in) -> wrappedOutput(out)
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r104727584 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala --- @@ -199,10 +200,21 @@ class DataSetJoin( val joinOpName = s"where: ($joinConditionToString), join: ($joinSelectionToString)" +//consider all fields not which are not keys are forwarded +val leftIndices = (0 until left.getRowType.getFieldCount).diff(leftKeys) --- End diff -- The keys can also be forwarded. Actually, these are the most interesting fields because the DataSet will be partitioned and maybe also sorted on these keys. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r104727744 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala --- @@ -199,10 +200,21 @@ class DataSetJoin( val joinOpName = s"where: ($joinConditionToString), join: ($joinSelectionToString)" +//consider all fields not which are not keys are forwarded +val leftIndices = (0 until left.getRowType.getFieldCount).diff(leftKeys) +val fieldsLeft = getForwardedInput(leftDataSet.getType, returnType, leftIndices) + +val rightIndices = (0 until right.getRowType.getFieldCount) + .diff(rightKeys) --- End diff -- Keys can be forwarded --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r104730153 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.dataset.forwarding + +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, TypeInformation => TypeInfo} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.DataSet +import org.apache.flink.table.api.TableException +import org.apache.flink.types.Row + +object FieldForwardingUtils { + + def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields + + private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = { +throw new TableException(s"Implementation for $customWrapper wrapper is missing.") + } + + /** +* Wrapper for {@link getForwardedFields} +*/ + def getForwardedInput( + inputType: TypeInfo[_], + outputType: TypeInfo[_], + forwardIndices: Seq[Int], + customWrapper: TypeInfo[_] => +Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String = { + +getForwardedFields(inputType, + outputType, + forwardIndices.zip(forwardIndices), + customWrapper) + } + + /** +* Wraps provided indices with proper names. +* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row +* +* @param inputType information of input data +* @param outputType information of output data +* @param forwardIndices tuple of (input, output) indices of a forwarded field +* @param customWrapper used for figuring out proper type in specific cases, +* e.g. {@see DataSetSingleRowJoin} +* @return string with forwarded fields mapped from input to output +*/ + def getForwardedFields( + inputType: TypeInfo[_], + outputType: TypeInfo[_], + forwardIndices: Seq[(Int, Int)], + customWrapper: TypeInfo[_] => +Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String = { + +def chooseWrapper( +typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = { + + typeInformation match { +case composite: CompositeType[_] => + val fields = extractFields(composite) + compositeTypeField(fields) +case basic: BasicTypeInfo[_] => + Seq((s"*", basic)) +case array: BasicArrayTypeInfo[_, _] => + Seq((s"*", array)) +case _ => + customWrapper(typeInformation) + } +} + +val wrapInput = chooseWrapper(inputType) +val wrapOutput = chooseWrapper(outputType) + +forwardFields(forwardIndices, wrapInput, wrapOutput) + } + + private def extractFields( + composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = { + +val types = for { + i <- 0 until composite.getArity +} yield { composite.getTypeAt(i) } + +composite.getFieldNames.zip(types) + } + + private def forwardFields( + forwardIndices: Seq[(Int, Int)], + wrappedInput: Int => (String, TypeInfo[_]), + wrappedOutput: Int => (String, TypeInfo[_])): String = { + +implicit class Field2ForwardField(left: (String, TypeInfo[_])) { + def ->(right: (String, TypeInfo[_])): String = if (left.equals(right)) { +s"${left._1}" + } else { +if (left._2.equals(right._2)) { + s"${left._1}->${right._1}" +} else { + null +} + } +} + +forwardIndices map { + case (in, out) => +wrappedInput(in) -> wrappedOutput(out)
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r104730024 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.dataset.forwarding + +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, TypeInformation => TypeInfo} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.DataSet +import org.apache.flink.table.api.TableException +import org.apache.flink.types.Row + +object FieldForwardingUtils { + + def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields + + private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = { +throw new TableException(s"Implementation for $customWrapper wrapper is missing.") + } + + /** +* Wrapper for {@link getForwardedFields} +*/ + def getForwardedInput( + inputType: TypeInfo[_], + outputType: TypeInfo[_], + forwardIndices: Seq[Int], + customWrapper: TypeInfo[_] => +Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String = { + +getForwardedFields(inputType, + outputType, + forwardIndices.zip(forwardIndices), + customWrapper) + } + + /** +* Wraps provided indices with proper names. +* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row +* +* @param inputType information of input data +* @param outputType information of output data +* @param forwardIndices tuple of (input, output) indices of a forwarded field +* @param customWrapper used for figuring out proper type in specific cases, +* e.g. {@see DataSetSingleRowJoin} +* @return string with forwarded fields mapped from input to output +*/ + def getForwardedFields( + inputType: TypeInfo[_], + outputType: TypeInfo[_], + forwardIndices: Seq[(Int, Int)], + customWrapper: TypeInfo[_] => +Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String = { + +def chooseWrapper( +typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = { + + typeInformation match { +case composite: CompositeType[_] => + val fields = extractFields(composite) + compositeTypeField(fields) +case basic: BasicTypeInfo[_] => + Seq((s"*", basic)) +case array: BasicArrayTypeInfo[_, _] => + Seq((s"*", array)) +case _ => + customWrapper(typeInformation) + } +} + +val wrapInput = chooseWrapper(inputType) +val wrapOutput = chooseWrapper(outputType) + +forwardFields(forwardIndices, wrapInput, wrapOutput) + } + + private def extractFields( + composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = { + +val types = for { + i <- 0 until composite.getArity +} yield { composite.getTypeAt(i) } + +composite.getFieldNames.zip(types) + } + + private def forwardFields( + forwardIndices: Seq[(Int, Int)], + wrappedInput: Int => (String, TypeInfo[_]), + wrappedOutput: Int => (String, TypeInfo[_])): String = { + +implicit class Field2ForwardField(left: (String, TypeInfo[_])) { + def ->(right: (String, TypeInfo[_])): String = if (left.equals(right)) { +s"${left._1}" + } else { +if (left._2.equals(right._2)) { + s"${left._1}->${right._1}" +} else { + null +} + } +} + +forwardIndices map { + case (in, out) => +wrappedInput(in) -> wrappedOutput(out)
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r95975700 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.dataset.forwarding + +import org.apache.calcite.rex.RexProgram +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.table.api.TableException + +import scala.collection.JavaConversions._ + +object FieldForwardingUtils { + + def compositeTypeField = (fields: Seq[String]) => (v: Int) => fields(v) + + private def throwMissedWrapperException(wrapperCustomCase: TypeInformation[_]) = { +throw new TableException(s"Implementation for $wrapperCustomCase index wrapper is missing.") + } + + /** +* Wrapper for {@link getForwardedFields} +* @param inputType +* @param outputType +* @param forwardIndices +* @param wrapperCustomCase +* @param calcProgram +* @return +*/ + def getForwardedInput( + inputType: TypeInformation[_], + outputType: TypeInformation[_], + forwardIndices: Seq[Int], + wrapperCustomCase: TypeInformation[_] => (Int) => String = throwMissedWrapperException, + calcProgram: Option[RexProgram] = None) = { + +getForwardedFields(inputType, + outputType, + forwardIndices.zip(forwardIndices), + wrapperCustomCase, + calcProgram) + } + + /** +* Wraps provided indices with proper names, e.g. _1 for tuple, f0 for row, fieldName for POJO. +* @param inputType +* @param outputType +* @param forwardIndices - tuple of input-output indices of a forwarded field +* @param wrapperCustomCase - used for figuring out proper type in specific cases, +* e.g. {@see DataSetSingleRowJoin} +* @param calcProgram - used for figuring out proper type in specific cases, +*e.g. {@see DataSetCalc} +* @return - string with forwarded fields mapped from input to output +*/ + def getForwardedFields( + inputType: TypeInformation[_], + outputType: TypeInformation[_], + forwardIndices: Seq[(Int, Int)], + wrapperCustomCase: TypeInformation[_] => (Int) => String = throwMissedWrapperException, + calcProgram: Option[RexProgram] = None): String = { + +def chooseWrapper(typeInformation: TypeInformation[_]): (Int) => String = { + typeInformation match { +case composite: CompositeType[_] => + // POJOs' fields are sorted, so we can not access them by their positional index. + // So we collect field names from + // outputRowType. For all other types we get field names from inputDS. + val typeFieldList = composite.getFieldNames + var fieldWrapper: (Int) => String = compositeTypeField(typeFieldList) + if (calcProgram.isDefined) { +val projectFieldList = calcProgram.get.getOutputRowType.getFieldNames --- End diff -- I think this would break the order of the fields. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r95975427 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala --- @@ -129,8 +131,35 @@ class DataSetCalc( body, returnType) +def getForwardIndices = { + //get indices of all modified operands + val modifiedOperands = calcProgram. +getExprList +.filter(_.isInstanceOf[RexCall]) +.flatMap(_.asInstanceOf[RexCall].operands) +.map(_.asInstanceOf[RexLocalRef].getIndex) --- End diff -- Shouldn't the modified fields are meaning to the input fields? So it should be RexInputRef? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...
GitHub user NickolayVasilishin opened a pull request: https://github.com/apache/flink/pull/3040 [FLINK-3850] Add forward field annotations to DataSet Add forward field annotations to DataSet operators generated by the Table API - Added field forwarding at most of `DataSetRel` implementations. - String with forwarded fields allowed to be empty at `SemanticPropUtil.java` - Wrapper for indices based on types moved to object class `FieldForwardingUtils` - In most cases forwarding done only for conversion `BatchScan`: forwarding at conversion `DataSetAggregate`: forwarding at conversion `DataSetCalc`: forwarding based on unmodified at RexCalls operands `DataSetCorrelate`: forwarding based on unmodified at RexCalls operands `DataSetIntersect`: forwarding at conversion `DataSetJoin`: forwarding based on fields which are not keys `DataSetMinus`: forwarding at conversion `DataSetSingleRowJoin`: forwarded all fields from multi row dataset, single row used via broadcast `DataSetSort`: all fields forwarded + conversion I hope, I've understood the meaning of forward fields right: fields, that are not used for computations. So I assumed, that these fields are not used in `RexCalls` or as `join keys`. Also I forwarded fields in type conversions. The most complex thing was to determine correct input and output field names. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NickolayVasilishin/flink FLINK-3850 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3040.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3040 commit 25cc1f022eb399bade37ef7b0fd0b87a9e509d67 Author: nikolay_vasilishinDate: 2016-12-23T10:50:46Z [FLINK-3850] Add forward field annotations to DataSet operators generated by the Table API - Added field forwarding at most of DataSetRel implementations. - String with forwarded fields allowed to be empty at SemanticPropUtil.java - Wrapper for indices based on types moved to object class FieldForwardingUtils - In most cases forwarding done only for conversion BatchScan: forwarding at conversion DataSetAggregate: forwarding at conversion DataSetCalc: forwarding based on unmodified at RexCalls operands DataSetCorrelate: forwarding based on unmodified at RexCalls operands DataSetIntersect: forwarding at conversion DataSetJoin: forwarding based on fields which are not keys DataSetMinus: forwarding at conversion DataSetSingleRowJoin: forwarded all fields from multi row dataset, single row used via broadcast DataSetSort: all fields forwarded + conversion Conflicts: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---