[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15686045#comment-15686045 ] ASF GitHub Bot commented on FLINK-4294: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2319 > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > Fix For: 1.2.0 > > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15686033#comment-15686033 ] ASF GitHub Bot commented on FLINK-4294: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2319 Merging... > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15684650#comment-15684650 ] ASF GitHub Bot commented on FLINK-4294: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2319 +1 to merge > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15684123#comment-15684123 ] ASF GitHub Bot commented on FLINK-4294: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2319 @fhueske I reworked the PR again. I will rebase it now. > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683492#comment-15683492 ] ASF GitHub Bot commented on FLINK-4294: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r6877 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.UnresolvedException +import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure, ValidationSuccess} + + +case class Flattening(child: Expression) extends UnaryExpression { + + override def toString = s"$child.flatten()" + + override private[flink] def resultType: TypeInformation[_] = +throw UnresolvedException(s"Invalid call to on ${this.getClass}.") + + override private[flink] def validateInput(): ExprValidationResult = +ValidationFailure(s"Unresolved flattening of $child") +} + +case class GetCompositeField(child: Expression, key: Any) extends UnaryExpression { + + private var fieldIndex: Option[Int] = None + + override def toString = s"$child.get($key)" + + override private[flink] def validateInput(): ExprValidationResult = { --- End diff -- No, the framework only calls this method if the children are valid so we can directly check the result type. > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683462#comment-15683462 ] ASF GitHub Bot commented on FLINK-4294: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r4877 --- Diff: docs/dev/table_api.md --- @@ -1656,6 +1656,29 @@ temporalOverlaps(TIMEPOINT, TEMPORAL, TIMEPOINT, TEMPORAL) + + +{% highlight java %} +ANY.flatten() +{% endhighlight %} + + +Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its subtypes into a flat representation where every subtype is a separate field. --- End diff -- Currently, they are named "_c0", "_c1" etc. but after thinking about it this is not very nice, because it is difficult to identify POJO fields that don't have a field order. I will try to improve that. > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677686#comment-15677686 ] ASF GitHub Bot commented on FLINK-4294: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r88683102 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.UnresolvedException +import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure, ValidationSuccess} + + +case class Flattening(child: Expression) extends UnaryExpression { + + override def toString = s"$child.flatten()" + + override private[flink] def resultType: TypeInformation[_] = +throw UnresolvedException(s"Invalid call to on ${this.getClass}.") --- End diff -- Please check exception message. > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677690#comment-15677690 ] ASF GitHub Bot commented on FLINK-4294: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r88682935 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala --- @@ -324,10 +335,19 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalCeil(unit, operand) } + lazy val prefixGet: PackratParser[Expression] = +GET ~ "(" ~ composite ~ "," ~ literalExpr ~ ")" ^^ { + case _ ~ _ ~ e ~ _ ~ index ~ _ => +GetCompositeField(e, index.asInstanceOf[Literal].value) + } + + lazy val prefixFlattening: PackratParser[Expression] = +GET ~ "(" ~> composite <~ ")" ^^ { e => Flattening(e) } --- End diff -- `GET` -> `FLATTEN`? Please add a test case for flatten in prefix notation. > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677684#comment-15677684 ] ASF GitHub Bot commented on FLINK-4294: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r88682137 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala --- @@ -79,14 +83,27 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp } private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match { -// TODO add specific RelDataTypes -// for PrimitiveArrayTypeInfo, ObjectArrayTypeInfo, CompositeType +case ct: CompositeType[_] => + new CompositeRelDataType(ct, this) + +// TODO add specific RelDataTypes for PrimitiveArrayTypeInfo, ObjectArrayTypeInfo case ti: TypeInformation[_] => new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem]) case ti@_ => throw TableException(s"Unsupported type information: $ti") } + + override def createTypeWithNullability( + relDataType: RelDataType, + nullable: Boolean) +: RelDataType = relDataType match { +case composite: CompositeRelDataType => + // at the moment we do not care about nullability --- End diff -- I see, thanks! > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677687#comment-15677687 ] ASF GitHub Bot commented on FLINK-4294: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r88679280 --- Diff: docs/dev/table_api.md --- @@ -1656,6 +1656,29 @@ temporalOverlaps(TIMEPOINT, TEMPORAL, TIMEPOINT, TEMPORAL) + + +{% highlight java %} +ANY.flatten() +{% endhighlight %} + + +Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its subtypes into a flat representation where every subtype is a separate field. --- End diff -- How are the fields named? > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677688#comment-15677688 ] ASF GitHub Bot commented on FLINK-4294: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r88699109 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala --- @@ -74,15 +75,52 @@ object RexNodeTranslator { /** * Parses all input expressions to [[UnresolvedAlias]]. -* And expands star to parent's full project list. +* And expands star to parent's full project list and flattens composite types. */ - def expandProjectList(exprs: Seq[Expression], parent: LogicalNode): Seq[NamedExpression] = { + def expandProjectList( + exprs: Seq[Expression], + parent: LogicalNode, + tableEnv: TableEnvironment) +: Seq[NamedExpression] = { + val projectList = new ListBuffer[NamedExpression] exprs.foreach { + case n: UnresolvedFieldReference if n.name == "*" => projectList ++= parent.output.map(UnresolvedAlias(_)) + + // flattening can only applied on field references + case Flattening(composite) if --- End diff -- Add a case for `Flattening(_)` that catches Flattenings on non-field expressions and throw a `ValidationException`? > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677685#comment-15677685 ] ASF GitHub Bot commented on FLINK-4294: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r88679626 --- Diff: docs/dev/table_api.md --- @@ -1656,6 +1656,29 @@ temporalOverlaps(TIMEPOINT, TEMPORAL, TIMEPOINT, TEMPORAL) + + +{% highlight java %} +ANY.flatten() +{% endhighlight %} + + +Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its subtypes into a flat representation where every subtype is a separate field. + + + + + +{% highlight java %} +COMPOSITE.get(STRING) --- End diff -- OK, I agree `.get()` has benefits too. > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677689#comment-15677689 ] ASF GitHub Bot commented on FLINK-4294: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r88684517 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.UnresolvedException +import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure, ValidationSuccess} + + +case class Flattening(child: Expression) extends UnaryExpression { + + override def toString = s"$child.flatten()" + + override private[flink] def resultType: TypeInformation[_] = +throw UnresolvedException(s"Invalid call to on ${this.getClass}.") + + override private[flink] def validateInput(): ExprValidationResult = +ValidationFailure(s"Unresolved flattening of $child") +} + +case class GetCompositeField(child: Expression, key: Any) extends UnaryExpression { + + private var fieldIndex: Option[Int] = None + + override def toString = s"$child.get($key)" + + override private[flink] def validateInput(): ExprValidationResult = { --- End diff -- Don't we need to recursively check if `child` is valid as well? Could be the result of an expression, right? > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15677691#comment-15677691 ] ASF GitHub Bot commented on FLINK-4294: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r88699648 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.UnresolvedException +import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure, ValidationSuccess} + + +case class Flattening(child: Expression) extends UnaryExpression { + + override def toString = s"$child.flatten()" + + override private[flink] def resultType: TypeInformation[_] = +throw UnresolvedException(s"Invalid call to on ${this.getClass}.") + + override private[flink] def validateInput(): ExprValidationResult = +ValidationFailure(s"Unresolved flattening of $child") --- End diff -- Add a comment that `Flattening` is converted into `GetCompositeFields` before validation? > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15580345#comment-15580345 ] ASF GitHub Bot commented on FLINK-4294: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r83556947 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala --- @@ -79,14 +83,27 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp } private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match { -// TODO add specific RelDataTypes -// for PrimitiveArrayTypeInfo, ObjectArrayTypeInfo, CompositeType +case ct: CompositeType[_] => + new CompositeRelDataType(ct, this) + +// TODO add specific RelDataTypes for PrimitiveArrayTypeInfo, ObjectArrayTypeInfo case ti: TypeInformation[_] => new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem]) case ti@_ => throw TableException(s"Unsupported type information: $ti") } + + override def createTypeWithNullability( + relDataType: RelDataType, + nullable: Boolean) +: RelDataType = relDataType match { +case composite: CompositeRelDataType => + // at the moment we do not care about nullability --- End diff -- `CompositeRelDataType` is the first custom type and thus not supported by the superclass (`JavaTypeFactory`) that's why I had to override this method. The super method does some copying logic and sets the nullability of every field of the record. At the moment every field in the Table API can be null anyway, so I skipped this logic here until we provide full nullabilty support through the entire API. > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15580335#comment-15580335 ] ASF GitHub Bot commented on FLINK-4294: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r83556835 --- Diff: docs/dev/table_api.md --- @@ -1656,6 +1656,29 @@ temporalOverlaps(TIMEPOINT, TEMPORAL, TIMEPOINT, TEMPORAL) + + +{% highlight java %} +ANY.flatten() +{% endhighlight %} + + +Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its subtypes into a flat representation where every subtype is a separate field. + + + + + +{% highlight java %} +COMPOSITE.get(STRING) --- End diff -- I thought we only agreed that `getField` is too long. Personally I like this approach more because it can be found in the expression DSL and has a Scala doc; and it doesn't add further case distinction logic to the validation layer. Since "$" is valid part of Java identifier we would also restrict the field naming of POJOs. If there is a consensus I can change it to "field$substring". > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15576429#comment-15576429 ] ASF GitHub Bot commented on FLINK-4294: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r83492774 --- Diff: docs/dev/table_api.md --- @@ -1656,6 +1656,29 @@ temporalOverlaps(TIMEPOINT, TEMPORAL, TIMEPOINT, TEMPORAL) + + +{% highlight java %} +ANY.flatten() +{% endhighlight %} + + +Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its subtypes into a flat representation where every subtype is a separate field. + + + + + +{% highlight java %} +COMPOSITE.get(STRING) --- End diff -- Didn't we agree on the symbol parsing approach like `'field$subfield`? Would that be more difficult to implement or have any other shortcomings? > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15576430#comment-15576430 ] ASF GitHub Bot commented on FLINK-4294: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r83492573 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala --- @@ -79,14 +83,27 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp } private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match { -// TODO add specific RelDataTypes -// for PrimitiveArrayTypeInfo, ObjectArrayTypeInfo, CompositeType +case ct: CompositeType[_] => + new CompositeRelDataType(ct, this) + +// TODO add specific RelDataTypes for PrimitiveArrayTypeInfo, ObjectArrayTypeInfo case ti: TypeInformation[_] => new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem]) case ti@_ => throw TableException(s"Unsupported type information: $ti") } + + override def createTypeWithNullability( + relDataType: RelDataType, + nullable: Boolean) +: RelDataType = relDataType match { +case composite: CompositeRelDataType => + // at the moment we do not care about nullability --- End diff -- Can you explain what nullability is and why it is not important here? > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15575154#comment-15575154 ] ASF GitHub Bot commented on FLINK-4294: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2319 @fhueske I updated the PR. It is now in a mergable state. I had to disable automatic flattening as it flattens types but does no unflattening. So if a composite type enters the Table API it can never leave it untouched. The SQL value construction `ROW(value, value)` does not work without flattening anymore, we have to create an optimizer rule or something similar to flatten this specific type. I will create a followup issue, once this is merged. > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15523361#comment-15523361 ] ASF GitHub Bot commented on FLINK-4294: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r80500304 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala --- @@ -223,10 +223,28 @@ trait ImplicitExpressionOperations { */ def toTime = Cast(expr, SqlTimeTypeInfo.TIME) -/** + /** * Parses a timestamp String in the form "yy-mm-dd hh:mm:ss.fff" to a SQL Timestamp. */ def toTimestamp = Cast(expr, SqlTimeTypeInfo.TIMESTAMP) + + /** +* Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and +* returns it's value. +* +* @param name name of the field (similar to Flink's field expressions) +* @return value of the field +*/ + def getField(name: String) = GetCompositeField(expr, name) --- End diff -- I think we don't need a wildcard. I will change this PR to support `field$subfield`. > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15435022#comment-15435022 ] ASF GitHub Bot commented on FLINK-4294: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r76068132 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala --- @@ -223,10 +223,28 @@ trait ImplicitExpressionOperations { */ def toTime = Cast(expr, SqlTimeTypeInfo.TIME) -/** + /** * Parses a timestamp String in the form "yy-mm-dd hh:mm:ss.fff" to a SQL Timestamp. */ def toTimestamp = Cast(expr, SqlTimeTypeInfo.TIMESTAMP) + + /** +* Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and +* returns it's value. +* +* @param name name of the field (similar to Flink's field expressions) +* @return value of the field +*/ + def getField(name: String) = GetCompositeField(expr, name) --- End diff -- At the moment we specify the field name as a String anyway, so `'*` is not necessary. > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15434709#comment-15434709 ] ASF GitHub Bot commented on FLINK-4294: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r76036947 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala --- @@ -223,10 +223,28 @@ trait ImplicitExpressionOperations { */ def toTime = Cast(expr, SqlTimeTypeInfo.TIME) -/** + /** * Parses a timestamp String in the form "yy-mm-dd hh:mm:ss.fff" to a SQL Timestamp. */ def toTimestamp = Cast(expr, SqlTimeTypeInfo.TIMESTAMP) + + /** +* Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and +* returns it's value. +* +* @param name name of the field (similar to Flink's field expressions) +* @return value of the field +*/ + def getField(name: String) = GetCompositeField(expr, name) --- End diff -- 😞 I find that `'*` can work. So if we want to extend wildcard, `address.get('*)` maybe a good choice. Or we can use `_` to replace `*` , such as `address.$_`. > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15434592#comment-15434592 ] ASF GitHub Bot commented on FLINK-4294: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r76024507 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala --- @@ -223,10 +223,28 @@ trait ImplicitExpressionOperations { */ def toTime = Cast(expr, SqlTimeTypeInfo.TIME) -/** + /** * Parses a timestamp String in the form "yy-mm-dd hh:mm:ss.fff" to a SQL Timestamp. */ def toTimestamp = Cast(expr, SqlTimeTypeInfo.TIMESTAMP) + + /** +* Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and +* returns it's value. +* +* @param name name of the field (similar to Flink's field expressions) +* @return value of the field +*/ + def getField(name: String) = GetCompositeField(expr, name) --- End diff -- Unfortunately, `'address$*` does not work because `*` is not a valid character for Scala symbols :-( > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15434228#comment-15434228 ] ASF GitHub Bot commented on FLINK-4294: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r75998992 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala --- @@ -223,10 +223,28 @@ trait ImplicitExpressionOperations { */ def toTime = Cast(expr, SqlTimeTypeInfo.TIME) -/** + /** * Parses a timestamp String in the form "yy-mm-dd hh:mm:ss.fff" to a SQL Timestamp. */ def toTimestamp = Cast(expr, SqlTimeTypeInfo.TIMESTAMP) + + /** +* Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and +* returns it's value. +* +* @param name name of the field (similar to Flink's field expressions) +* @return value of the field +*/ + def getField(name: String) = GetCompositeField(expr, name) --- End diff -- I'm in favor of parsing. Think about `'address$zip` vs `'address.get('zip)`, the former is more straightforward and shorter. And if we extend wildcard `'address$*` will be more concise. > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15433128#comment-15433128 ] ASF GitHub Bot commented on FLINK-4294: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r75900712 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/CompositeAccessTest.scala --- @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.calcite.tools.ValidationException +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor} +import org.apache.flink.api.scala.createTypeInformation +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table +import org.apache.flink.api.table.{Row, Types} +import org.apache.flink.api.table.expressions.CompositeAccessTest.{MyCaseClass, MyCaseClass2, MyPojo} +import org.apache.flink.api.table.expressions.utils.ExpressionTestBase +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.junit.Test + + +class CompositeAccessTest extends ExpressionTestBase { + + @Test + def testGetField(): Unit = { + +testAllApis( + 'f0.getField("intField"), + "f0.getField('intField')", + "testTable.f0.intField", + "42") + +testTableApi( + 'f0.getField(0), + "f0.getField(0)", + "42") + +testSqlApi("testTable.f0.stringField", "Bob") + +testSqlApi("testTable.f0.booleanField", "true") + +testAllApis( + 'f1.getField("objectField").getField("intField"), + "f1.getField('objectField').getField('intField')", + "testTable.f1.objectField.intField", + "25") + +testSqlApi("testTable.f1.objectField.stringField", "Timo") + +testSqlApi("testTable.f1.objectField.booleanField", "false") + +testAllApis( + 'f2.getField(0), + "f2.getField(0)", + "testTable.f2._1", + "a") + +testSqlApi("testTable.f3.f1", "b") + +testSqlApi("testTable.f4.myString", "Hello") + +testSqlApi("testTable.f5", "13") + +testSqlApi("testTable.f6", "null") + +testAllApis( + 'f7.getField("_1"), + "getField(f7, '_1')", + "testTable.f7._1", + "true") + +// TODO Not supported by Calcite +// you cannot get a composite type +// testSqlApi("testTable.f1.objectField", "25") --- End diff -- Ah, sorry. I did not read the PR description 😊 . I think it would make sense to improve the error message for this case and return a `TableException`. > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15433122#comment-15433122 ] ASF GitHub Bot commented on FLINK-4294: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r75900345 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala --- @@ -223,10 +223,28 @@ trait ImplicitExpressionOperations { */ def toTime = Cast(expr, SqlTimeTypeInfo.TIME) -/** + /** * Parses a timestamp String in the form "yy-mm-dd hh:mm:ss.fff" to a SQL Timestamp. */ def toTimestamp = Cast(expr, SqlTimeTypeInfo.TIMESTAMP) + + /** +* Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and +* returns it's value. +* +* @param name name of the field (similar to Flink's field expressions) +* @return value of the field +*/ + def getField(name: String) = GetCompositeField(expr, name) --- End diff -- Let's see if we can get a few more opinions on this. @aljoscha, @StephanEwen, @wuchong opinions on how to address nested fields in the Table API? > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15433113#comment-15433113 ] ASF GitHub Bot commented on FLINK-4294: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r75899092 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala --- @@ -223,10 +223,28 @@ trait ImplicitExpressionOperations { */ def toTime = Cast(expr, SqlTimeTypeInfo.TIME) -/** + /** * Parses a timestamp String in the form "yy-mm-dd hh:mm:ss.fff" to a SQL Timestamp. */ def toTimestamp = Cast(expr, SqlTimeTypeInfo.TIMESTAMP) + + /** +* Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and +* returns it's value. +* +* @param name name of the field (similar to Flink's field expressions) +* @return value of the field +*/ + def getField(name: String) = GetCompositeField(expr, name) --- End diff -- I like `get()`. But I'm also fine with the parsing. You decide ;-) > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15433102#comment-15433102 ] ASF GitHub Bot commented on FLINK-4294: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r75898467 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/CompositeAccessTest.scala --- @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.calcite.tools.ValidationException +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor} +import org.apache.flink.api.scala.createTypeInformation +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table +import org.apache.flink.api.table.{Row, Types} +import org.apache.flink.api.table.expressions.CompositeAccessTest.{MyCaseClass, MyCaseClass2, MyPojo} +import org.apache.flink.api.table.expressions.utils.ExpressionTestBase +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.junit.Test + + +class CompositeAccessTest extends ExpressionTestBase { + + @Test + def testGetField(): Unit = { + +testAllApis( + 'f0.getField("intField"), + "f0.getField('intField')", + "testTable.f0.intField", + "42") + +testTableApi( + 'f0.getField(0), + "f0.getField(0)", + "42") + +testSqlApi("testTable.f0.stringField", "Bob") + +testSqlApi("testTable.f0.booleanField", "true") + +testAllApis( + 'f1.getField("objectField").getField("intField"), + "f1.getField('objectField').getField('intField')", + "testTable.f1.objectField.intField", + "25") + +testSqlApi("testTable.f1.objectField.stringField", "Timo") + +testSqlApi("testTable.f1.objectField.booleanField", "false") + +testAllApis( + 'f2.getField(0), + "f2.getField(0)", + "testTable.f2._1", + "a") + +testSqlApi("testTable.f3.f1", "b") + +testSqlApi("testTable.f4.myString", "Hello") + +testSqlApi("testTable.f5", "13") + +testSqlApi("testTable.f6", "null") + +testAllApis( + 'f7.getField("_1"), + "getField(f7, '_1')", + "testTable.f7._1", + "true") + +// TODO Not supported by Calcite +// you cannot get a composite type +// testSqlApi("testTable.f1.objectField", "25") --- End diff -- As I wrote in the PR description, the Calcite guys improved and still improving their "two-level column structure handling" (see CALCITE-1208). Once we use Calcite 1.9 we can test those cases again and merge this PR. > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15428179#comment-15428179 ] ASF GitHub Bot commented on FLINK-4294: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2319 Hi @twalthr, thanks for the PR. I added a few comments / suggestions. Also, the documentation should be extended. Thanks, Fabian > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15428176#comment-15428176 ] ASF GitHub Bot commented on FLINK-4294: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r75479712 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala --- @@ -223,10 +223,28 @@ trait ImplicitExpressionOperations { */ def toTime = Cast(expr, SqlTimeTypeInfo.TIME) -/** + /** * Parses a timestamp String in the form "yy-mm-dd hh:mm:ss.fff" to a SQL Timestamp. */ def toTimestamp = Cast(expr, SqlTimeTypeInfo.TIMESTAMP) + + /** +* Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and +* returns it's value. +* +* @param name name of the field (similar to Flink's field expressions) +* @return value of the field +*/ + def getField(name: String) = GetCompositeField(expr, name) --- End diff -- I find `getField` a bit too long. I see two options: - A shorter message name, e.g., `get() or $()` - Or allow symbols like `'address$zip` and parsing the symbol names What do you think, @twalthr? > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15428175#comment-15428175 ] ASF GitHub Bot commented on FLINK-4294: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r75479361 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala --- @@ -146,4 +146,22 @@ class SelectITCase( tEnv.sql(sqlQuery) } + @Test + def testSelectWithCompositeType(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env, config) + +val sqlQuery = "SELECT MyTable.a2, MyTable.a1._2 FROM MyTable" --- End diff -- I found leaving out the table name (or table alias) leads to a Calcite exception which is hard to understand. Can we throw a `TableException` instead which is more helpful for the user? > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15428173#comment-15428173 ] ASF GitHub Bot commented on FLINK-4294: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2319#discussion_r75479103 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/CompositeAccessTest.scala --- @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.calcite.tools.ValidationException +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor} +import org.apache.flink.api.scala.createTypeInformation +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table +import org.apache.flink.api.table.{Row, Types} +import org.apache.flink.api.table.expressions.CompositeAccessTest.{MyCaseClass, MyCaseClass2, MyPojo} +import org.apache.flink.api.table.expressions.utils.ExpressionTestBase +import org.apache.flink.api.table.typeutils.RowTypeInfo +import org.junit.Test + + +class CompositeAccessTest extends ExpressionTestBase { + + @Test + def testGetField(): Unit = { + +testAllApis( + 'f0.getField("intField"), + "f0.getField('intField')", + "testTable.f0.intField", + "42") + +testTableApi( + 'f0.getField(0), + "f0.getField(0)", + "42") + +testSqlApi("testTable.f0.stringField", "Bob") + +testSqlApi("testTable.f0.booleanField", "true") + +testAllApis( + 'f1.getField("objectField").getField("intField"), + "f1.getField('objectField').getField('intField')", + "testTable.f1.objectField.intField", + "25") + +testSqlApi("testTable.f1.objectField.stringField", "Timo") + +testSqlApi("testTable.f1.objectField.booleanField", "false") + +testAllApis( + 'f2.getField(0), + "f2.getField(0)", + "testTable.f2._1", + "a") + +testSqlApi("testTable.f3.f1", "b") + +testSqlApi("testTable.f4.myString", "Hello") + +testSqlApi("testTable.f5", "13") + +testSqlApi("testTable.f6", "null") + +testAllApis( + 'f7.getField("_1"), + "getField(f7, '_1')", + "testTable.f7._1", + "true") + +// TODO Not supported by Calcite +// you cannot get a composite type +// testSqlApi("testTable.f1.objectField", "25") --- End diff -- I tried this case and got a Calcite exception which was not very helpful. Can we improve the exception for this error? Do you think we should open an issue in the Calcite JIRA to support this case, @twalthr? > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4294) Allow access of composite type fields
[ https://issues.apache.org/jira/browse/FLINK-4294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15401935#comment-15401935 ] ASF GitHub Bot commented on FLINK-4294: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/2319 [FLINK-4294] [table] Allow access of composite type fields Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed This PR implements composite types for the Table API & SQL. Fields of Flink's composite types such as Tuples, POJO, etc. can be accessed from within expressions/SQL (see also CompositeAccessTest for examples). This PR is not mergable at the moment as Calcite does not allow for selecting entire composite types. I will write on the Calcite ML for this. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-4294 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2319.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2319 commit 9ffc30416cb0873381b0b0bf484119bdd4459bdd Author: twalthr Date: 2016-08-01T12:15:49Z [FLINK-4294] [table] Allow access of composite type fields > Allow access of composite type fields > - > > Key: FLINK-4294 > URL: https://issues.apache.org/jira/browse/FLINK-4294 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Currently all Flink CompositeTypes are treated as GenericRelDataTypes. It > would be better to access individual fields of composite types, too. e.g. > {code} > SELECT composite.name FROM composites > SELECT tuple.f0 FROM tuples > 'f0.getField(0) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)