[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105731937
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
 ---
@@ -199,10 +200,21 @@ class DataSetJoin(
 
 val joinOpName = s"where: ($joinConditionToString), join: 
($joinSelectionToString)"
 
+//consider all fields not which are not keys are forwarded
+val leftIndices = (0 until 
left.getRowType.getFieldCount).diff(leftKeys)
--- End diff --

A Calcite join forwards all fields of both sides. If the left input is 
`(l1, l2, l3)` and the right input is `(r1, r2)`, then the result of the join 
will be `(l1, l2, l3, r1, r2)` for all pairs of left and right that satisfy the 
join condition. It does not matter which of the fields is a key field. If the 
join condition is `l1 == r2`, both fields are forwarded. Since DataSet joins 
organize the input data sets based on the key attributes (partition and sort) 
this attributes are especially interesting for forward field annotations.

Actually, I just noticed that we have to distinguish the type of the join 
(inner, left, right, full). We can only forward the fields of the inner side 
(both for inner join, left for left join, right for right join, none for full 
outer join) because the outer side might have been padded with `null` values. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105639200
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+def chooseWrapper(
+typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = {
+
+  typeInformation match {
+case composite: CompositeType[_] =>
+  val fields = extractFields(composite)
+  compositeTypeField(fields)
+case basic: BasicTypeInfo[_] =>
+  Seq((s"*", basic))
+case array: BasicArrayTypeInfo[_, _] =>
+  Seq((s"*", array))
+case _ =>
+  customWrapper(typeInformation)
+  }
+}
+
+val wrapInput = chooseWrapper(inputType)
+val wrapOutput = chooseWrapper(outputType)
+
+forwardFields(forwardIndices, wrapInput, wrapOutput)
+  }
+
+  private def extractFields(
+  composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = {
+
+val types = for {
+  i <- 0 until composite.getArity
+} yield { composite.getTypeAt(i) }
+
+composite.getFieldNames.zip(types)
+  }
+
+  private def forwardFields(
+  forwardIndices: Seq[(Int, Int)],
+  wrappedInput: Int => (String, TypeInfo[_]),
+  wrappedOutput: Int => (String, TypeInfo[_])): String = {
+
+implicit class Field2ForwardField(left: (String, TypeInfo[_])) {
+  def ->(right: (String, TypeInfo[_])): String = if 
(left.equals(right)) {
+s"${left._1}"
+  } else {
+if (left._2.equals(right._2)) {
+  s"${left._1}->${right._1}"
+} else {
+  null
--- End diff --

I think this case should never happen if correct mappings are provided. If 
the types do not match, we have a bug the logic to 

[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-13 Thread tonycox
Github user tonycox commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105600606
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+def chooseWrapper(
+typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = {
+
+  typeInformation match {
+case composite: CompositeType[_] =>
+  val fields = extractFields(composite)
+  compositeTypeField(fields)
+case basic: BasicTypeInfo[_] =>
+  Seq((s"*", basic))
+case array: BasicArrayTypeInfo[_, _] =>
+  Seq((s"*", array))
+case _ =>
+  customWrapper(typeInformation)
+  }
+}
+
+val wrapInput = chooseWrapper(inputType)
+val wrapOutput = chooseWrapper(outputType)
+
+forwardFields(forwardIndices, wrapInput, wrapOutput)
+  }
+
+  private def extractFields(
+  composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = {
+
+val types = for {
+  i <- 0 until composite.getArity
+} yield { composite.getTypeAt(i) }
+
+composite.getFieldNames.zip(types)
+  }
+
+  private def forwardFields(
+  forwardIndices: Seq[(Int, Int)],
+  wrappedInput: Int => (String, TypeInfo[_]),
+  wrappedOutput: Int => (String, TypeInfo[_])): String = {
+
+implicit class Field2ForwardField(left: (String, TypeInfo[_])) {
+  def ->(right: (String, TypeInfo[_])): String = if 
(left.equals(right)) {
+s"${left._1}"
+  } else {
+if (left._2.equals(right._2)) {
+  s"${left._1}->${right._1}"
+} else {
+  null
--- End diff --

@fhueske I think if types don't match we can just skip it


---
If your project is set up for it, you can reply to this email 

[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104730718
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
 ---
@@ -124,8 +127,26 @@ class DataSetCalc(
   body,
   returnType)
 
+def getForwardIndices = {
--- End diff --

move all forward fields related code into a function `getForwardFields()` 
to keep the main translation function more concise


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105014912
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
 ---
@@ -125,11 +126,25 @@ class DataSetAggregate(
 .name(aggOpName)
 }
 else {
+  //Forward all fields at conversion
+  val inputInfo = mappedInput.getType
--- End diff --

PR #3472 will remove the preparing mapper but add a `GroupCombineFunction`. 
For both, the combiner and the reducer, we need to forward the fields on which 
is grouped because these are not modified but just forwarded.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104708548
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
 ---
@@ -125,11 +126,25 @@ class DataSetAggregate(
 .name(aggOpName)
 }
 else {
+  //Forward all fields at conversion
+  val inputInfo = mappedInput.getType
--- End diff --

An aggregation operator returns the grouping keys and aggregated values. 
Only the grouping keys are not modified and forwarded. Forwarded fields can 
only be assigned for grouped aggregates.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105029771
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
--- End diff --

I didn't notice that the `customWrapper` was used. Can it be removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105030757
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+def chooseWrapper(
+typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = {
+
+  typeInformation match {
+case composite: CompositeType[_] =>
+  val fields = extractFields(composite)
+  compositeTypeField(fields)
+case basic: BasicTypeInfo[_] =>
+  Seq((s"*", basic))
+case array: BasicArrayTypeInfo[_, _] =>
+  Seq((s"*", array))
+case _ =>
--- End diff --

`AtomicType` can be `"*"` as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104685724
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
 ---
@@ -46,6 +46,7 @@ trait CommonCorrelate {
   config: TableConfig,
   inputTypeInfo: TypeInformation[Row],
   udtfTypeInfo: TypeInformation[Any],
+  returnType: TypeInformation[Row],
--- End diff --

why is this change necessary? `returnType` can be computed from `rowType` 
which is a parameter as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105029829
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
--- End diff --

Can the `customWrapper` be removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104723484
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
 ---
@@ -97,18 +103,41 @@ class DataSetCorrelate(
 val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
 val pojoFieldMapping = sqlFunction.getPojoFieldMapping
 val udtfTypeInfo = 
sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
+val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
 
 val mapFunc = correlateMapFunction(
   config,
   inputDS.getType,
   udtfTypeInfo,
+  returnType,
   getRowType,
   joinType,
   rexCall,
   condition,
   Some(pojoFieldMapping),
   ruleDescription)
 
-inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, 
relRowType))
+def getIndices = {
--- End diff --

A correlate forwards all fields from the input and the table function like 
this `[in1, in2, in3, tf1, tf2]` for an input `[in1, in2, in3]` and table 
function `[tf1, tf2]`. So we can do a simple position based mapping of the 
fields of the input type against the output type (field names might change). 
Basically similar to what you are doing with the single row join.

We do not need to look at the table function or the condition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104706099
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+def chooseWrapper(
+typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = {
+
+  typeInformation match {
+case composite: CompositeType[_] =>
+  val fields = extractFields(composite)
+  compositeTypeField(fields)
+case basic: BasicTypeInfo[_] =>
+  Seq((s"*", basic))
+case array: BasicArrayTypeInfo[_, _] =>
+  Seq((s"*", array))
+case _ =>
+  customWrapper(typeInformation)
+  }
+}
+
+val wrapInput = chooseWrapper(inputType)
+val wrapOutput = chooseWrapper(outputType)
+
+forwardFields(forwardIndices, wrapInput, wrapOutput)
+  }
+
+  private def extractFields(
+  composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = {
+
+val types = for {
+  i <- 0 until composite.getArity
+} yield { composite.getTypeAt(i) }
+
+composite.getFieldNames.zip(types)
+  }
+
+  private def forwardFields(
+  forwardIndices: Seq[(Int, Int)],
+  wrappedInput: Int => (String, TypeInfo[_]),
+  wrappedOutput: Int => (String, TypeInfo[_])): String = {
+
+implicit class Field2ForwardField(left: (String, TypeInfo[_])) {
+  def ->(right: (String, TypeInfo[_])): String = if 
(left.equals(right)) {
+s"${left._1}"
+  } else {
+if (left._2.equals(right._2)) {
+  s"${left._1}->${right._1}"
+} else {
+  null
--- End diff --

We should throw an exception here. If the types do not match, the logic to 
identify the mapping of forwarded fields is broken. 

[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105030435
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
--- End diff --

Please document the parameters. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105023568
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
 ---
@@ -124,8 +127,26 @@ class DataSetCalc(
   body,
   returnType)
 
+def getForwardIndices = {
+  // get (input, output) indices of operands,
+  // filter modified operands and specify forwarding
+  val inputFields = extractRefInputFields(calcProgram)
+  calcProgram.getProjectList
+.map(_.getIndex)
+.zipWithIndex
+.filter(tup => inputFields.contains(tup._1))
--- End diff --

This position based mapping is making some assumptions about the internal 
organization of a `RexProgram`, i.e., that the first `n` fields of the 
expressions list are filled in order by the `n` fields of the input.
Can we change this to iterate over the projection list and looking up the 
expression and check whether it is a `RexInputRef`. Basically something like:
```
calcProgram.getProjectList.zipWithIndex.map { case (p, out) =>
  val expr = calcProgram.getExprList.get(p.getIndex)
  expr match {
case i: RexInputRef => Some((i.getIndex, out))
case _ => None
  }
}.flatten
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104729449
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+def chooseWrapper(
+typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = {
+
+  typeInformation match {
+case composite: CompositeType[_] =>
+  val fields = extractFields(composite)
+  compositeTypeField(fields)
+case basic: BasicTypeInfo[_] =>
+  Seq((s"*", basic))
+case array: BasicArrayTypeInfo[_, _] =>
+  Seq((s"*", array))
+case _ =>
+  customWrapper(typeInformation)
+  }
+}
+
+val wrapInput = chooseWrapper(inputType)
+val wrapOutput = chooseWrapper(outputType)
+
+forwardFields(forwardIndices, wrapInput, wrapOutput)
+  }
+
+  private def extractFields(
+  composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = {
+
+val types = for {
+  i <- 0 until composite.getArity
+} yield { composite.getTypeAt(i) }
+
+composite.getFieldNames.zip(types)
+  }
+
+  private def forwardFields(
+  forwardIndices: Seq[(Int, Int)],
+  wrappedInput: Int => (String, TypeInfo[_]),
+  wrappedOutput: Int => (String, TypeInfo[_])): String = {
+
+implicit class Field2ForwardField(left: (String, TypeInfo[_])) {
+  def ->(right: (String, TypeInfo[_])): String = if 
(left.equals(right)) {
+s"${left._1}"
+  } else {
+if (left._2.equals(right._2)) {
+  s"${left._1}->${right._1}"
+} else {
+  null
+}
+  }
+}
+
+forwardIndices map {
+  case (in, out) =>
+wrappedInput(in) -> wrappedOutput(out)
 

[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r105032956
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
--- End diff --

This function can be removed, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104730046
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+def chooseWrapper(
+typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = {
+
+  typeInformation match {
+case composite: CompositeType[_] =>
+  val fields = extractFields(composite)
+  compositeTypeField(fields)
+case basic: BasicTypeInfo[_] =>
+  Seq((s"*", basic))
+case array: BasicArrayTypeInfo[_, _] =>
+  Seq((s"*", array))
+case _ =>
+  customWrapper(typeInformation)
+  }
+}
+
+val wrapInput = chooseWrapper(inputType)
+val wrapOutput = chooseWrapper(outputType)
+
+forwardFields(forwardIndices, wrapInput, wrapOutput)
+  }
+
+  private def extractFields(
+  composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = {
+
+val types = for {
+  i <- 0 until composite.getArity
+} yield { composite.getTypeAt(i) }
+
+composite.getFieldNames.zip(types)
+  }
+
+  private def forwardFields(
+  forwardIndices: Seq[(Int, Int)],
+  wrappedInput: Int => (String, TypeInfo[_]),
+  wrappedOutput: Int => (String, TypeInfo[_])): String = {
+
+implicit class Field2ForwardField(left: (String, TypeInfo[_])) {
+  def ->(right: (String, TypeInfo[_])): String = if 
(left.equals(right)) {
+s"${left._1}"
+  } else {
+if (left._2.equals(right._2)) {
+  s"${left._1}->${right._1}"
+} else {
+  null
+}
+  }
+}
+
+forwardIndices map {
+  case (in, out) =>
+wrappedInput(in) -> wrappedOutput(out)
 

[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104727584
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
 ---
@@ -199,10 +200,21 @@ class DataSetJoin(
 
 val joinOpName = s"where: ($joinConditionToString), join: 
($joinSelectionToString)"
 
+//consider all fields not which are not keys are forwarded
+val leftIndices = (0 until 
left.getRowType.getFieldCount).diff(leftKeys)
--- End diff --

The keys can also be forwarded. 
Actually, these are the most interesting fields because the DataSet will be 
partitioned and maybe also sorted on these keys.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104727744
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
 ---
@@ -199,10 +200,21 @@ class DataSetJoin(
 
 val joinOpName = s"where: ($joinConditionToString), join: 
($joinSelectionToString)"
 
+//consider all fields not which are not keys are forwarded
+val leftIndices = (0 until 
left.getRowType.getFieldCount).diff(leftKeys)
+val fieldsLeft = getForwardedInput(leftDataSet.getType, returnType, 
leftIndices)
+
+val rightIndices = (0 until right.getRowType.getFieldCount)
+  .diff(rightKeys)
--- End diff --

Keys can be forwarded


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104730153
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+def chooseWrapper(
+typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = {
+
+  typeInformation match {
+case composite: CompositeType[_] =>
+  val fields = extractFields(composite)
+  compositeTypeField(fields)
+case basic: BasicTypeInfo[_] =>
+  Seq((s"*", basic))
+case array: BasicArrayTypeInfo[_, _] =>
+  Seq((s"*", array))
+case _ =>
+  customWrapper(typeInformation)
+  }
+}
+
+val wrapInput = chooseWrapper(inputType)
+val wrapOutput = chooseWrapper(outputType)
+
+forwardFields(forwardIndices, wrapInput, wrapOutput)
+  }
+
+  private def extractFields(
+  composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = {
+
+val types = for {
+  i <- 0 until composite.getArity
+} yield { composite.getTypeAt(i) }
+
+composite.getFieldNames.zip(types)
+  }
+
+  private def forwardFields(
+  forwardIndices: Seq[(Int, Int)],
+  wrappedInput: Int => (String, TypeInfo[_]),
+  wrappedOutput: Int => (String, TypeInfo[_])): String = {
+
+implicit class Field2ForwardField(left: (String, TypeInfo[_])) {
+  def ->(right: (String, TypeInfo[_])): String = if 
(left.equals(right)) {
+s"${left._1}"
+  } else {
+if (left._2.equals(right._2)) {
+  s"${left._1}->${right._1}"
+} else {
+  null
+}
+  }
+}
+
+forwardIndices map {
+  case (in, out) =>
+wrappedInput(in) -> wrappedOutput(out)
 

[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-03-08 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r104730024
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, TypeInformation => TypeInfo}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[(String, TypeInfo[_])]) => fields
+
+  private def throwMissedWrapperException(customWrapper: TypeInfo[_]) = {
+throw new TableException(s"Implementation for $customWrapper wrapper 
is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+*/
+  def getForwardedInput(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[Int],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  customWrapper)
+  }
+
+  /**
+* Wraps provided indices with proper names.
+* e.g. _1 for Tuple, f0 for Row, fieldName for POJO and named Row
+*
+* @param inputType  information of input data
+* @param outputType information of output data
+* @param forwardIndices tuple of (input, output) indices of a 
forwarded field
+* @param customWrapper  used for figuring out proper type in specific 
cases,
+*   e.g. {@see DataSetSingleRowJoin}
+* @return string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInfo[_],
+  outputType: TypeInfo[_],
+  forwardIndices: Seq[(Int, Int)],
+  customWrapper: TypeInfo[_] =>
+Seq[(String, TypeInfo[_])] = throwMissedWrapperException): String 
= {
+
+def chooseWrapper(
+typeInformation: TypeInfo[_]): Seq[(String, TypeInfo[_])] = {
+
+  typeInformation match {
+case composite: CompositeType[_] =>
+  val fields = extractFields(composite)
+  compositeTypeField(fields)
+case basic: BasicTypeInfo[_] =>
+  Seq((s"*", basic))
+case array: BasicArrayTypeInfo[_, _] =>
+  Seq((s"*", array))
+case _ =>
+  customWrapper(typeInformation)
+  }
+}
+
+val wrapInput = chooseWrapper(inputType)
+val wrapOutput = chooseWrapper(outputType)
+
+forwardFields(forwardIndices, wrapInput, wrapOutput)
+  }
+
+  private def extractFields(
+  composite: CompositeType[_]): Seq[(String, TypeInfo[_])] = {
+
+val types = for {
+  i <- 0 until composite.getArity
+} yield { composite.getTypeAt(i) }
+
+composite.getFieldNames.zip(types)
+  }
+
+  private def forwardFields(
+  forwardIndices: Seq[(Int, Int)],
+  wrappedInput: Int => (String, TypeInfo[_]),
+  wrappedOutput: Int => (String, TypeInfo[_])): String = {
+
+implicit class Field2ForwardField(left: (String, TypeInfo[_])) {
+  def ->(right: (String, TypeInfo[_])): String = if 
(left.equals(right)) {
+s"${left._1}"
+  } else {
+if (left._2.equals(right._2)) {
+  s"${left._1}->${right._1}"
+} else {
+  null
+}
+  }
+}
+
+forwardIndices map {
+  case (in, out) =>
+wrappedInput(in) -> wrappedOutput(out)
 

[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-01-13 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r95975700
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/forwarding/FieldForwardingUtils.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset.forwarding
+
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.table.api.TableException
+
+import scala.collection.JavaConversions._
+
+object FieldForwardingUtils {
+
+  def compositeTypeField = (fields: Seq[String]) => (v: Int) => fields(v)
+
+  private def throwMissedWrapperException(wrapperCustomCase: 
TypeInformation[_]) = {
+throw new TableException(s"Implementation for $wrapperCustomCase index 
wrapper is missing.")
+  }
+
+  /**
+* Wrapper for {@link getForwardedFields}
+* @param inputType
+* @param outputType
+* @param forwardIndices
+* @param wrapperCustomCase
+* @param calcProgram
+* @return
+*/
+  def getForwardedInput(
+  inputType: TypeInformation[_],
+  outputType: TypeInformation[_],
+  forwardIndices: Seq[Int],
+  wrapperCustomCase: TypeInformation[_] => (Int) => String = 
throwMissedWrapperException,
+  calcProgram: Option[RexProgram] = None) = {
+
+getForwardedFields(inputType,
+  outputType,
+  forwardIndices.zip(forwardIndices),
+  wrapperCustomCase,
+  calcProgram)
+  }
+
+  /**
+* Wraps provided indices with proper names, e.g. _1 for tuple, f0 for 
row, fieldName for POJO.
+* @param inputType
+* @param outputType
+* @param forwardIndices - tuple of input-output indices of a forwarded 
field
+* @param wrapperCustomCase - used for  figuring out proper type in 
specific cases,
+*  e.g. {@see DataSetSingleRowJoin}
+* @param calcProgram - used for  figuring out proper type in specific 
cases,
+*e.g. {@see DataSetCalc}
+* @return - string with forwarded fields mapped from input to output
+*/
+  def getForwardedFields(
+  inputType: TypeInformation[_],
+  outputType: TypeInformation[_],
+  forwardIndices: Seq[(Int, Int)],
+  wrapperCustomCase: TypeInformation[_] => (Int) => String = 
throwMissedWrapperException,
+  calcProgram: Option[RexProgram] = None): String = {
+
+def chooseWrapper(typeInformation: TypeInformation[_]): (Int) => 
String = {
+  typeInformation match {
+case composite: CompositeType[_] =>
+  // POJOs' fields are sorted, so we can not access them by their 
positional index.
+  // So we collect field names from
+  // outputRowType. For all other types we get field names from 
inputDS.
+  val typeFieldList = composite.getFieldNames
+  var fieldWrapper: (Int) => String = 
compositeTypeField(typeFieldList)
+  if (calcProgram.isDefined) {
+val projectFieldList = 
calcProgram.get.getOutputRowType.getFieldNames
--- End diff --

I think this would break the order of the fields.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2017-01-13 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3040#discussion_r95975427
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
 ---
@@ -129,8 +131,35 @@ class DataSetCalc(
   body,
   returnType)
 
+def getForwardIndices = {
+  //get indices of all modified operands
+  val modifiedOperands = calcProgram.
+getExprList
+.filter(_.isInstanceOf[RexCall])
+.flatMap(_.asInstanceOf[RexCall].operands)
+.map(_.asInstanceOf[RexLocalRef].getIndex)
--- End diff --

Shouldn't the modified fields are meaning to the input fields? So it should 
be RexInputRef?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2016-12-23 Thread NickolayVasilishin
GitHub user NickolayVasilishin opened a pull request:

https://github.com/apache/flink/pull/3040

[FLINK-3850] Add forward field annotations to DataSet 

Add forward field annotations to DataSet operators generated by the Table 
API

  - Added field forwarding at most of `DataSetRel` implementations.
  - String with forwarded fields allowed to be empty at 
`SemanticPropUtil.java`
  - Wrapper for indices based on types moved to object class 
`FieldForwardingUtils`
  - In most cases forwarding done only for conversion

   `BatchScan`: forwarding at conversion
   `DataSetAggregate`: forwarding at conversion
   `DataSetCalc`: forwarding based on unmodified at RexCalls operands
   `DataSetCorrelate`:  forwarding based on unmodified at RexCalls operands
   `DataSetIntersect`:  forwarding at conversion
   `DataSetJoin`: forwarding based on fields which are not keys
   `DataSetMinus`: forwarding at conversion
   `DataSetSingleRowJoin`: forwarded all fields from multi row dataset, 
single row used via broadcast
   `DataSetSort`: all fields forwarded + conversion

I hope, I've understood the meaning of forward fields right: fields, that 
are not used for computations. So I assumed, that these fields are not used in 
`RexCalls` or as `join keys`. Also I forwarded fields in type conversions.
The most complex thing was to determine correct input and output field 
names.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NickolayVasilishin/flink FLINK-3850

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3040.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3040


commit 25cc1f022eb399bade37ef7b0fd0b87a9e509d67
Author: nikolay_vasilishin 
Date:   2016-12-23T10:50:46Z

[FLINK-3850] Add forward field annotations to DataSet operators generated 
by the Table API

  - Added field forwarding at most of DataSetRel implementations.
  - String with forwarded fields allowed to be empty at 
SemanticPropUtil.java
  - Wrapper for indices based on types moved to object class 
FieldForwardingUtils
  - In most cases forwarding done only for conversion

   BatchScan: forwarding at conversion
   DataSetAggregate: forwarding at conversion
   DataSetCalc: forwarding based on unmodified at RexCalls operands
   DataSetCorrelate:  forwarding based on unmodified at RexCalls operands
   DataSetIntersect:  forwarding at conversion
   DataSetJoin: forwarding based on fields which are not keys
   DataSetMinus: forwarding at conversion
   DataSetSingleRowJoin: forwarded all fields from multi row dataset, 
single row used via broadcast
   DataSetSort: all fields forwarded + conversion

Conflicts:

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---