[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15288057#comment-15288057 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-219902602 @fhueske @twalthr thank you for your reviews and detailed comments through this issue . I'm quite excited to have it merged :) > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > Fix For: 1.1.0 > > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15287804#comment-15287804 ] ASF GitHub Bot commented on FLINK-3754: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1958 > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15287227#comment-15287227 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-219812166 Merging this PR. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15286211#comment-15286211 ] ASF GitHub Bot commented on FLINK-3754: --- Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-219650388 @fhueske I looked through the changes. You have my +1 to merge. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280204#comment-15280204 ] ASF GitHub Bot commented on FLINK-3754: --- Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-218475561 @yjshen great work! PR looks very good. I had only some minor refactoring comments. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280218#comment-15280218 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-218477620 Hi @twalthr, thanks very much for the review work! I'll resolve your comments shortly :) > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280187#comment-15280187 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62856490 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/UnresolvedException.scala --- @@ -15,9 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.api.table +package org.apache.flink.api.table.expressions -/** - * General Exception for all errors during table handling. - */ -class TableException(msg: String) extends RuntimeException(msg) +case class UnresolvedException(msg: String) extends RuntimeException(msg) --- End diff -- OK, will do this > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280178#comment-15280178 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62855247 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala --- @@ -18,85 +18,26 @@ package org.apache.flink.api.table.expressions import org.apache.calcite.rex.RexNode -import org.apache.calcite.sql.SqlOperator -import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure} /** * General expression for unresolved function calls. The function can be a built-in * scalar function or a user-defined scalar function. */ -case class Call(functionName: String, args: Expression*) extends Expression { +case class Call(functionName: String, args: Seq[Expression]) extends Expression { --- End diff -- While I was resolving `Call` into solid expressions, `FunctionCatalog` is used and its `withChildren` method is looking up constructors to create new `expression` as follows: 1. look up expressions take `Seq[Expression]` as argument 2. if 1 is not fulfilled, look up the constructor match exactly the number of arguments, like the `MyFunc` example you provided. Does this make sense? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280165#comment-15280165 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62853599 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala --- @@ -130,19 +130,17 @@ trait ImplicitExpressionOperations { * @param endIndex last character of the substring (starting at 1, inclusive) * @return substring */ - def substring(beginIndex: Expression, endIndex: Expression) = { -Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex, endIndex) - } + def substring(beginIndex: Expression, endIndex: Expression) = +SubString(expr, beginIndex, endIndex) /** * Creates a substring of the given string beginning at the given index to the end. * * @param beginIndex first character of the substring (starting at 1, inclusive) * @return substring */ - def substring(beginIndex: Expression) = { -Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex) - } + def substring(beginIndex: Expression) = +new SubString(expr, beginIndex) --- End diff -- Since `SubString` is defined as `case class SubString(str: Expression, begin: Expression, end: Expression)` and so does the generated `apply` method, the `new` here cannot be removed, otherwise complaining of `cannot resolve subString with such signature` > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280168#comment-15280168 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62853748 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TypeCoercion.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation + +object TypeCoercion { --- End diff -- You are right, will fix this. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280151#comment-15280151 ] ASF GitHub Bot commented on FLINK-3754: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62852367 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala --- @@ -20,19 +20,55 @@ package org.apache.flink.api.table.expressions import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder -import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation} import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.validate._ -case class Cast(child: Expression, tpe: TypeInformation[_]) extends UnaryExpression { +case class Cast(child: Expression, resultType: TypeInformation[_]) extends UnaryExpression { - override def toString = s"$child.cast($tpe)" + override def toString = s"$child.cast($resultType)" override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { -relBuilder.cast(child.toRexNode, TypeConverter.typeInfoToSqlType(tpe)) +relBuilder.cast(child.toRexNode, TypeConverter.typeInfoToSqlType(resultType)) } - override def makeCopy(anyRefs: Seq[AnyRef]): this.type = { + override def makeCopy(anyRefs: Array[AnyRef]): this.type = { val child: Expression = anyRefs.head.asInstanceOf[Expression] -copy(child, tpe).asInstanceOf[this.type] +copy(child, resultType).asInstanceOf[this.type] + } + + override def validateInput(): ExprValidationResult = { +if (Cast.canCast(child.resultType, resultType)) { + ValidationSuccess +} else { + ValidationFailure(s"Unsupported cast from ${child.resultType} to $resultType") +} + } +} + +object Cast { --- End diff -- I would also move this into `typeutils`. We'll may need it somewhere else too. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280145#comment-15280145 ] ASF GitHub Bot commented on FLINK-3754: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62851933 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala --- @@ -18,85 +18,26 @@ package org.apache.flink.api.table.expressions import org.apache.calcite.rex.RexNode -import org.apache.calcite.sql.SqlOperator -import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure} /** * General expression for unresolved function calls. The function can be a built-in * scalar function or a user-defined scalar function. */ -case class Call(functionName: String, args: Expression*) extends Expression { +case class Call(functionName: String, args: Seq[Expression]) extends Expression { --- End diff -- The reason why I used var-args here was that it is easier to be used in the API once custom functions are possible: `.filter(Call("MYFUNC", 'f1, 'f2))`. But we can also find an other way later. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280140#comment-15280140 ] ASF GitHub Bot commented on FLINK-3754: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62851129 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala --- @@ -18,85 +18,26 @@ package org.apache.flink.api.table.expressions import org.apache.calcite.rex.RexNode -import org.apache.calcite.sql.SqlOperator -import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure} /** * General expression for unresolved function calls. The function can be a built-in * scalar function or a user-defined scalar function. */ -case class Call(functionName: String, args: Expression*) extends Expression { +case class Call(functionName: String, args: Seq[Expression]) extends Expression { override def children: Seq[Expression] = args override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { -relBuilder.call( - BuiltInFunctionNames.toSqlOperator(functionName), - args.map(_.toRexNode): _*) +throw new UnresolvedException(s"trying to convert UnresolvedFunction $functionName to RexNode") } override def toString = s"\\$functionName(${args.mkString(", ")})" - override def makeCopy(newArgs: Seq[AnyRef]): this.type = { -val copy = Call( - newArgs.head.asInstanceOf[String], - newArgs.drop(1).asInstanceOf[Seq[Expression]]: _*) + override def resultType = +throw new UnresolvedException(s"calling dataType on Unresolved Function $functionName") --- End diff -- resultType instead of dataType. Space between Unresolved Function. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280129#comment-15280129 ] ASF GitHub Bot commented on FLINK-3754: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62849325 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/UnresolvedException.scala --- @@ -15,9 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.api.table +package org.apache.flink.api.table.expressions -/** - * General Exception for all errors during table handling. - */ -class TableException(msg: String) extends RuntimeException(msg) +case class UnresolvedException(msg: String) extends RuntimeException(msg) --- End diff -- Could you put this into `exceptions.scala` and document it? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280120#comment-15280120 ] ASF GitHub Bot commented on FLINK-3754: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62848946 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TypeCoercion.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation + +object TypeCoercion { --- End diff -- I would put this into the `typeutils` package. The `expressions` package should only contain expressions. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280074#comment-15280074 ] ASF GitHub Bot commented on FLINK-3754: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62843680 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala --- @@ -130,19 +130,17 @@ trait ImplicitExpressionOperations { * @param endIndex last character of the substring (starting at 1, inclusive) * @return substring */ - def substring(beginIndex: Expression, endIndex: Expression) = { -Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex, endIndex) - } + def substring(beginIndex: Expression, endIndex: Expression) = +SubString(expr, beginIndex, endIndex) /** * Creates a substring of the given string beginning at the given index to the end. * * @param beginIndex first character of the substring (starting at 1, inclusive) * @return substring */ - def substring(beginIndex: Expression) = { -Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex) - } + def substring(beginIndex: Expression) = +new SubString(expr, beginIndex) --- End diff -- `new` can be removed. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280049#comment-15280049 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-218447948 Hi @fhueske , I've just finished my work, can you give a another pass of review? Thanks! > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278805#comment-15278805 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62745064 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/ScalarFunctionsTest.scala --- @@ -215,7 +215,7 @@ class ScalarFunctionsTest { } - @Test + @Ignore --- End diff -- This will be enabled again once type coercion rules are implemented. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278792#comment-15278792 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-218278581 Hi @fhueske, part of the comments are resolved and updated, two TODOs: - [ ] Type coercion for expressions, for example: - `Add(int, long)` should be auto coerced to `Add(cast(long), long)` - expressions expecting `double` as input but get `int` such as `ln(int)` should be auto cast to `ln (cast(double))` - [ ] `TreeNode`, `FunctionCatalog` etc rewrite. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278778#comment-15278778 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62742492 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import scala.collection.JavaConverters._ + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataTypeFactory +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.schema.{Table => CTable} +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.java.operators.join.JoinType +import org.apache.flink.api.table.{StreamTableEnvironment, TableEnvironment} +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.validate.ValidationException + +case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = projectList.map(_.toAttribute) + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +val afterResolve = super.resolveExpressions(tableEnv).asInstanceOf[Project] +val newProjectList = + afterResolve.projectList.zipWithIndex.map { case (e, i) => +e match { + case u @ UnresolvedAlias(child, optionalAliasName) => child match { +case ne: NamedExpression => ne +case e if !e.valid => u +case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name) +case other => Alias(other, optionalAliasName.getOrElse(s"_c$i")) + } + case _ => throw new IllegalArgumentException +} +} +Project(newProjectList, child) + } + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = { +val allAlias = projectList.forall(_.isInstanceOf[Alias]) +child.toRelNode(relBuilder) +if (allAlias) { + // Calcite's RelBuilder does not translate identity projects even if they rename fields. + // Add a projection ourselves (will be automatically removed by translation rules). + relBuilder.push( +LogicalProject.create(relBuilder.peek(), + projectList.map(_.toRexNode(relBuilder)).asJava, + projectList.map(_.name).asJava)) +} else { + relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*) +} + } +} + +case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = +throw new UnresolvedException("Invalid call to output on AliasNode") + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = +throw new UnresolvedException("Invalid call to toRelNode on AliasNode") + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +if (aliasList.length > child.output.length) { + failValidation("Aliasing more fields than we actually have") +} else if (!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) { + failValidation("`as` only allow string arguments") +} else { + val names = aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name) + val input = child.output + Project( +names.zip(input).map { case (name, attr) => + Alias(attr, name)} ++ input.drop(names.length), child) +} + } +} + +case class
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278777#comment-15278777 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62742390 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import scala.collection.JavaConverters._ + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataTypeFactory +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.schema.{Table => CTable} +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.java.operators.join.JoinType +import org.apache.flink.api.table.{StreamTableEnvironment, TableEnvironment} +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.validate.ValidationException + +case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = projectList.map(_.toAttribute) + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { --- End diff -- Moved > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278682#comment-15278682 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62731427 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala --- @@ -92,8 +93,7 @@ abstract class StreamTableEnvironment( def ingest(tableName: String): Table = { if (isRegistered(tableName)) { - relBuilder.scan(tableName) - new Table(relBuilder.build(), this) + new Table(this, CatalogNode(tableName, getTable(tableName), getTypeFactory)) } else { throw new TableException(s"Table \'$tableName\' was not found in the registry.") --- End diff -- I think a ValidationException would be more appropriate, because `ingest()` belongs to a query IMO. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278516#comment-15278516 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62715643 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import scala.collection.JavaConverters._ + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataTypeFactory +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.schema.{Table => CTable} +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.java.operators.join.JoinType +import org.apache.flink.api.table.{StreamTableEnvironment, TableEnvironment} +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.validate.ValidationException + +case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = projectList.map(_.toAttribute) + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +val afterResolve = super.resolveExpressions(tableEnv).asInstanceOf[Project] +val newProjectList = + afterResolve.projectList.zipWithIndex.map { case (e, i) => +e match { + case u @ UnresolvedAlias(child, optionalAliasName) => child match { +case ne: NamedExpression => ne +case e if !e.valid => u +case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name) +case other => Alias(other, optionalAliasName.getOrElse(s"_c$i")) + } + case _ => throw new IllegalArgumentException +} +} +Project(newProjectList, child) + } + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = { +val allAlias = projectList.forall(_.isInstanceOf[Alias]) +child.toRelNode(relBuilder) +if (allAlias) { + // Calcite's RelBuilder does not translate identity projects even if they rename fields. + // Add a projection ourselves (will be automatically removed by translation rules). + relBuilder.push( +LogicalProject.create(relBuilder.peek(), + projectList.map(_.toRexNode(relBuilder)).asJava, + projectList.map(_.name).asJava)) +} else { + relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*) +} + } +} + +case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = +throw new UnresolvedException("Invalid call to output on AliasNode") + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = +throw new UnresolvedException("Invalid call to toRelNode on AliasNode") + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +if (aliasList.length > child.output.length) { + failValidation("Aliasing more fields than we actually have") +} else if (!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) { + failValidation("`as` only allow string arguments") --- End diff -- > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement >
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278512#comment-15278512 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62715347 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala --- @@ -92,8 +93,7 @@ abstract class StreamTableEnvironment( def ingest(tableName: String): Table = { if (isRegistered(tableName)) { - relBuilder.scan(tableName) - new Table(relBuilder.build(), this) + new Table(this, CatalogNode(tableName, getTable(tableName), getTypeFactory)) } else { throw new TableException(s"Table \'$tableName\' was not found in the registry.") --- End diff -- As we discussed https://github.com/apache/flink/pull/1958#discussion_r62680699 , we should keep this TableException? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278505#comment-15278505 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62714636 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import scala.collection.JavaConverters._ + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataTypeFactory +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.schema.{Table => CTable} +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.java.operators.join.JoinType +import org.apache.flink.api.table.{StreamTableEnvironment, TableEnvironment} +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.validate.ValidationException + +case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = projectList.map(_.toAttribute) + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +val afterResolve = super.resolveExpressions(tableEnv).asInstanceOf[Project] +val newProjectList = + afterResolve.projectList.zipWithIndex.map { case (e, i) => +e match { + case u @ UnresolvedAlias(child, optionalAliasName) => child match { +case ne: NamedExpression => ne +case e if !e.valid => u +case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name) +case other => Alias(other, optionalAliasName.getOrElse(s"_c$i")) + } + case _ => throw new IllegalArgumentException +} +} +Project(newProjectList, child) + } + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = { +val allAlias = projectList.forall(_.isInstanceOf[Alias]) +child.toRelNode(relBuilder) +if (allAlias) { + // Calcite's RelBuilder does not translate identity projects even if they rename fields. + // Add a projection ourselves (will be automatically removed by translation rules). + relBuilder.push( +LogicalProject.create(relBuilder.peek(), + projectList.map(_.toRexNode(relBuilder)).asJava, + projectList.map(_.name).asJava)) +} else { + relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*) +} + } +} + +case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = +throw new UnresolvedException("Invalid call to output on AliasNode") + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = +throw new UnresolvedException("Invalid call to toRelNode on AliasNode") + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +if (aliasList.length > child.output.length) { + failValidation("Aliasing more fields than we actually have") +} else if (!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) { + failValidation("`as` only allow string arguments") +} else { + val names = aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name) + val input = child.output + Project( +names.zip(input).map { case (name, attr) => + Alias(attr, name)} ++ input.drop(names.length), child) +} + } +} + +case class
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278482#comment-15278482 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62712828 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import scala.collection.JavaConverters._ + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataTypeFactory +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.schema.{Table => CTable} +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.java.operators.join.JoinType +import org.apache.flink.api.table.{StreamTableEnvironment, TableEnvironment} +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.validate.ValidationException + +case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = projectList.map(_.toAttribute) + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +val afterResolve = super.resolveExpressions(tableEnv).asInstanceOf[Project] +val newProjectList = + afterResolve.projectList.zipWithIndex.map { case (e, i) => +e match { + case u @ UnresolvedAlias(child, optionalAliasName) => child match { +case ne: NamedExpression => ne +case e if !e.valid => u +case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name) +case other => Alias(other, optionalAliasName.getOrElse(s"_c$i")) + } + case _ => throw new IllegalArgumentException +} +} +Project(newProjectList, child) + } + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = { +val allAlias = projectList.forall(_.isInstanceOf[Alias]) +child.toRelNode(relBuilder) +if (allAlias) { + // Calcite's RelBuilder does not translate identity projects even if they rename fields. + // Add a projection ourselves (will be automatically removed by translation rules). + relBuilder.push( +LogicalProject.create(relBuilder.peek(), + projectList.map(_.toRexNode(relBuilder)).asJava, + projectList.map(_.name).asJava)) +} else { + relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*) +} + } +} + +case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = +throw new UnresolvedException("Invalid call to output on AliasNode") + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = +throw new UnresolvedException("Invalid call to toRelNode on AliasNode") + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +if (aliasList.length > child.output.length) { + failValidation("Aliasing more fields than we actually have") +} else if (!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) { + failValidation("`as` only allow string arguments") --- End diff -- `Alias only accept name expressions as arguments` ? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 >
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278437#comment-15278437 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62709108 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.trees.TreeNode +import org.apache.flink.api.table.validate._ + +/** + * LogicalNode is created and validated as we construct query plan using Table API. + * + * The main validation procedure is separated into two phases: + * Expressions' resolution and transformation (#resolveExpressions(TableEnvironment)): + * + * translate UnresolvedFieldReference into ResolvedFieldReference + * using child operator's output + * translate Call(UnresolvedFunction) into solid Expression + * generate alias names for query output + * + * + * + * LogicalNode validation (#validate(TableEnvironment)): + * + * check no UnresolvedFieldReference exists any more + * check if all expressions have children of needed type + * check each logical operator have desired input + * + * Once we pass the validation phase, we can safely convert LogicalNode into Calcite's RelNode. + * + * Note: this is adapted from Apache Spark's LogicalPlan. + */ +abstract class LogicalNode extends TreeNode[LogicalNode] { + def output: Seq[Attribute] + + def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +// resolve references and function calls +transformExpressionsUp { + case u @ UnresolvedFieldReference(name) => +resolveChildren(name).getOrElse(u) + case c @ Call(name, children) if c.childrenValid => +tableEnv.getFunctionCatalog.lookupFunction(name, children) +} + } + + def toRelNode(relBuilder: RelBuilder): RelBuilder --- End diff -- You mean add a ``` final def toRelNode(relBuilder: RelBuilder): RelNode = relBuilder.build() ``` to get rid of `relBuilder.build()`? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278426#comment-15278426 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62708060 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.trees.TreeNode +import org.apache.flink.api.table.validate._ + +/** + * LogicalNode is created and validated as we construct query plan using Table API. + * + * The main validation procedure is separated into two phases: + * Expressions' resolution and transformation (#resolveExpressions(TableEnvironment)): + * + * translate UnresolvedFieldReference into ResolvedFieldReference + * using child operator's output + * translate Call(UnresolvedFunction) into solid Expression + * generate alias names for query output + * + * + * + * LogicalNode validation (#validate(TableEnvironment)): + * + * check no UnresolvedFieldReference exists any more + * check if all expressions have children of needed type + * check each logical operator have desired input + * + * Once we pass the validation phase, we can safely convert LogicalNode into Calcite's RelNode. + * + * Note: this is adapted from Apache Spark's LogicalPlan. + */ +abstract class LogicalNode extends TreeNode[LogicalNode] { + def output: Seq[Attribute] + + def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +// resolve references and function calls +transformExpressionsUp { + case u @ UnresolvedFieldReference(name) => +resolveChildren(name).getOrElse(u) + case c @ Call(name, children) if c.childrenValid => +tableEnv.getFunctionCatalog.lookupFunction(name, children) +} + } + + def toRelNode(relBuilder: RelBuilder): RelBuilder --- End diff -- Yes, renaming the method would be good. I thought, we could also add a `toRelNode()` method which actually returns a `RelNode` in addition to the renamed method. The renamed method would then only be required for subclasses of `LogicalNode` and other classes would not need to call `build()` on the `RelBuilder` but receive a finalized `RelNode`. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278407#comment-15278407 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62705691 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.trees.TreeNode +import org.apache.flink.api.table.validate._ + +/** + * LogicalNode is created and validated as we construct query plan using Table API. + * + * The main validation procedure is separated into two phases: + * Expressions' resolution and transformation (#resolveExpressions(TableEnvironment)): + * + * translate UnresolvedFieldReference into ResolvedFieldReference + * using child operator's output + * translate Call(UnresolvedFunction) into solid Expression + * generate alias names for query output + * + * + * + * LogicalNode validation (#validate(TableEnvironment)): + * + * check no UnresolvedFieldReference exists any more + * check if all expressions have children of needed type + * check each logical operator have desired input + * + * Once we pass the validation phase, we can safely convert LogicalNode into Calcite's RelNode. + * + * Note: this is adapted from Apache Spark's LogicalPlan. + */ +abstract class LogicalNode extends TreeNode[LogicalNode] { + def output: Seq[Attribute] + + def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +// resolve references and function calls +transformExpressionsUp { + case u @ UnresolvedFieldReference(name) => +resolveChildren(name).getOrElse(u) + case c @ Call(name, children) if c.childrenValid => +tableEnv.getFunctionCatalog.lookupFunction(name, children) +} + } + + def toRelNode(relBuilder: RelBuilder): RelBuilder --- End diff -- `toRelNode` I am using here may not be a proper name, this function is manipulating the argument `relBuilder`'s inner stack using Calcite API: `relBuilder.join`, `relBuilder.distinct` etc, and returning the `relBuilder` with a changed stack, much like the `StringBuilder`'s `append` method. Maybe the I can just rename it to `construct` or `apply`? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278385#comment-15278385 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62703535 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala --- @@ -152,13 +160,13 @@ abstract class TableEnvironment(val config: TableConfig) { * * @param name The name under which the table is registered. * @param table The table to register in the catalog -* @throws TableException if another table is registered under the provided name. +* @throws ValidationException if another table is registered under the provided name. */ - @throws[TableException] + @throws[ValidationException] protected def registerTableInternal(name: String, table: AbstractTable): Unit = { if (isRegistered(name)) { - throw new TableException(s"Table \'$name\' already exists. " + + throw new ValidationException(s"Table \'$name\' already exists. " + --- End diff -- Yes, that's a good point > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278377#comment-15278377 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62703065 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala --- @@ -152,13 +160,13 @@ abstract class TableEnvironment(val config: TableConfig) { * * @param name The name under which the table is registered. * @param table The table to register in the catalog -* @throws TableException if another table is registered under the provided name. +* @throws ValidationException if another table is registered under the provided name. */ - @throws[TableException] + @throws[ValidationException] protected def registerTableInternal(name: String, table: AbstractTable): Unit = { if (isRegistered(name)) { - throw new TableException(s"Table \'$name\' already exists. " + + throw new ValidationException(s"Table \'$name\' already exists. " + --- End diff -- It makes sense to me. In this way, unsupported operations for StreamTableEnvironment should throws `TableException` ? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278209#comment-15278209 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62685356 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.validate + +import scala.reflect.ClassTag +import scala.util.{Failure, Success, Try} + +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.validate.FunctionCatalog.FunctionBuilder + +/** + * A catalog for looking up user defined functions, used by an Analyzer. + * + * Note: this is adapted from Apache Spark's FunctionRegistry. + */ +trait FunctionCatalog { --- End diff -- Do we need this class except for some of the tests? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278205#comment-15278205 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62684793 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -472,61 +339,39 @@ class Table( tableEnv.emitToSink(this, configuredSink) } - private def createRenamingProject(exprs: Seq[RexNode]): LogicalProject = { - -val names = exprs.map{ e => - e.getKind match { -case SqlKind.AS => - e.asInstanceOf[RexCall].getOperands.get(1) -.asInstanceOf[RexLiteral].getValue -.asInstanceOf[NlsString].getValue -case SqlKind.INPUT_REF => - relNode.getRowType.getFieldNames.get(e.asInstanceOf[RexInputRef].getIndex) -case _ => - throw new PlanGenException("Unexpected expression type encountered.") - } - -} -LogicalProject.create(relNode, exprs.toList.asJava, names.toList.asJava) - } - private def checkUniqueNames(exprs: Seq[Expression]): Unit = { --- End diff -- Can we move this check into `Project`? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278214#comment-15278214 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-218182019 Hi @yjshen, sorry for the time it took to review the PR. I like the approach and the PR looks good, IMO. I had a few minor comments and questions. Let me know what you think. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278204#comment-15278204 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62684644 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import scala.collection.JavaConverters._ + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataTypeFactory +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.schema.{Table => CTable} +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.java.operators.join.JoinType +import org.apache.flink.api.table.{StreamTableEnvironment, TableEnvironment} +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.validate.ValidationException + +case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = projectList.map(_.toAttribute) + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +val afterResolve = super.resolveExpressions(tableEnv).asInstanceOf[Project] +val newProjectList = + afterResolve.projectList.zipWithIndex.map { case (e, i) => +e match { + case u @ UnresolvedAlias(child, optionalAliasName) => child match { +case ne: NamedExpression => ne +case e if !e.valid => u +case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name) +case other => Alias(other, optionalAliasName.getOrElse(s"_c$i")) + } + case _ => throw new IllegalArgumentException +} +} +Project(newProjectList, child) + } + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = { +val allAlias = projectList.forall(_.isInstanceOf[Alias]) +child.toRelNode(relBuilder) +if (allAlias) { + // Calcite's RelBuilder does not translate identity projects even if they rename fields. + // Add a projection ourselves (will be automatically removed by translation rules). + relBuilder.push( +LogicalProject.create(relBuilder.peek(), + projectList.map(_.toRexNode(relBuilder)).asJava, + projectList.map(_.name).asJava)) +} else { + relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*) +} + } +} + +case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = +throw new UnresolvedException("Invalid call to output on AliasNode") + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = +throw new UnresolvedException("Invalid call to toRelNode on AliasNode") + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +if (aliasList.length > child.output.length) { + failValidation("Aliasing more fields than we actually have") +} else if (!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) { + failValidation("`as` only allow string arguments") +} else { + val names = aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name) + val input = child.output + Project( +names.zip(input).map { case (name, attr) => + Alias(attr, name)} ++ input.drop(names.length), child) +} + } +} + +case class
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278212#comment-15278212 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62685650 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/exceptions.scala --- @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.validate --- End diff -- Can we move this to `org.apache.flink.api.table` and also add the `TableException` and `ExpressionParserException`? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278199#comment-15278199 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62684054 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import scala.collection.JavaConverters._ + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataTypeFactory +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.schema.{Table => CTable} +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.java.operators.join.JoinType +import org.apache.flink.api.table.{StreamTableEnvironment, TableEnvironment} +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.validate.ValidationException + +case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = projectList.map(_.toAttribute) + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +val afterResolve = super.resolveExpressions(tableEnv).asInstanceOf[Project] +val newProjectList = + afterResolve.projectList.zipWithIndex.map { case (e, i) => +e match { + case u @ UnresolvedAlias(child, optionalAliasName) => child match { +case ne: NamedExpression => ne +case e if !e.valid => u +case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name) +case other => Alias(other, optionalAliasName.getOrElse(s"_c$i")) + } + case _ => throw new IllegalArgumentException +} +} +Project(newProjectList, child) + } + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = { +val allAlias = projectList.forall(_.isInstanceOf[Alias]) +child.toRelNode(relBuilder) +if (allAlias) { + // Calcite's RelBuilder does not translate identity projects even if they rename fields. + // Add a projection ourselves (will be automatically removed by translation rules). + relBuilder.push( +LogicalProject.create(relBuilder.peek(), + projectList.map(_.toRexNode(relBuilder)).asJava, + projectList.map(_.name).asJava)) +} else { + relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*) +} + } +} + +case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = +throw new UnresolvedException("Invalid call to output on AliasNode") + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = +throw new UnresolvedException("Invalid call to toRelNode on AliasNode") + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +if (aliasList.length > child.output.length) { + failValidation("Aliasing more fields than we actually have") +} else if (!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) { + failValidation("`as` only allow string arguments") +} else { + val names = aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name) + val input = child.output + Project( +names.zip(input).map { case (name, attr) => + Alias(attr, name)} ++ input.drop(names.length), child) +} + } +} + +case class
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278196#comment-15278196 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62683790 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import scala.collection.JavaConverters._ + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataTypeFactory +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.schema.{Table => CTable} +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.java.operators.join.JoinType +import org.apache.flink.api.table.{StreamTableEnvironment, TableEnvironment} +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.validate.ValidationException + +case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = projectList.map(_.toAttribute) + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +val afterResolve = super.resolveExpressions(tableEnv).asInstanceOf[Project] +val newProjectList = + afterResolve.projectList.zipWithIndex.map { case (e, i) => +e match { + case u @ UnresolvedAlias(child, optionalAliasName) => child match { +case ne: NamedExpression => ne +case e if !e.valid => u +case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name) +case other => Alias(other, optionalAliasName.getOrElse(s"_c$i")) + } + case _ => throw new IllegalArgumentException +} +} +Project(newProjectList, child) + } + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = { +val allAlias = projectList.forall(_.isInstanceOf[Alias]) +child.toRelNode(relBuilder) +if (allAlias) { + // Calcite's RelBuilder does not translate identity projects even if they rename fields. + // Add a projection ourselves (will be automatically removed by translation rules). + relBuilder.push( +LogicalProject.create(relBuilder.peek(), + projectList.map(_.toRexNode(relBuilder)).asJava, + projectList.map(_.name).asJava)) +} else { + relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*) +} + } +} + +case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = +throw new UnresolvedException("Invalid call to output on AliasNode") + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = +throw new UnresolvedException("Invalid call to toRelNode on AliasNode") + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +if (aliasList.length > child.output.length) { + failValidation("Aliasing more fields than we actually have") +} else if (!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) { + failValidation("`as` only allow string arguments") --- End diff -- I think this error message might be confusing because the Scala API does not use String arguments. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL:
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278207#comment-15278207 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62685176 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -472,61 +339,39 @@ class Table( tableEnv.emitToSink(this, configuredSink) } - private def createRenamingProject(exprs: Seq[RexNode]): LogicalProject = { - -val names = exprs.map{ e => - e.getKind match { -case SqlKind.AS => - e.asInstanceOf[RexCall].getOperands.get(1) -.asInstanceOf[RexLiteral].getValue -.asInstanceOf[NlsString].getValue -case SqlKind.INPUT_REF => - relNode.getRowType.getFieldNames.get(e.asInstanceOf[RexInputRef].getIndex) -case _ => - throw new PlanGenException("Unexpected expression type encountered.") - } - -} -LogicalProject.create(relNode, exprs.toList.asJava, names.toList.asJava) - } - private def checkUniqueNames(exprs: Seq[Expression]): Unit = { val names: mutable.Set[String] = mutable.Set() exprs.foreach { - case n: Naming => + case n: Alias => // explicit name if (names.contains(n.name)) { - throw new IllegalArgumentException(s"Duplicate field name $n.name.") + throw new ValidationException(s"Duplicate field name $n.name.") } else { names.add(n.name) } case u: UnresolvedFieldReference => // simple field forwarding if (names.contains(u.name)) { - throw new IllegalArgumentException(s"Duplicate field name $u.name.") + throw new ValidationException(s"Duplicate field name $u.name.") } else { names.add(u.name) } case _ => // Do nothing } } + @inline protected def validate(logicalNode: => LogicalNode): Table = { +new Table(tableEnv, logicalNode.validate(tableEnv)) --- End diff -- Would it be OK to remove this method and move the `new Table` call to each API method? I think this would make the code easier to read. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278180#comment-15278180 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62681920 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala --- @@ -25,15 +25,29 @@ import org.apache.calcite.sql.SqlOperator import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder -import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo, TypeInformation} +import org.apache.flink.api.table.typeutils.{TypeCheckUtils, TypeConverter} +import org.apache.flink.api.table.validate.ExprValidationResult -abstract class BinaryArithmetic extends BinaryExpression { self: Product => +abstract class BinaryArithmetic extends BinaryExpression { def sqlOperator: SqlOperator override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { relBuilder.call(sqlOperator, children.map(_.toRexNode)) } + + override def dataType = left.dataType --- End diff -- Is this a sensible default? Shouldn't it be the data type with the higher precision / larger value range of both inputs, e.g., `Double` in case of `Double` and `Int`, etc? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278192#comment-15278192 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62683416 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.trees.TreeNode +import org.apache.flink.api.table.validate._ + +/** + * LogicalNode is created and validated as we construct query plan using Table API. + * + * The main validation procedure is separated into two phases: + * Expressions' resolution and transformation (#resolveExpressions(TableEnvironment)): + * --- End diff -- We use wiki-style syntax instead of HTML in the ScalaDocs of `flink-table` > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278203#comment-15278203 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62684422 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import scala.collection.JavaConverters._ + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataTypeFactory +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.schema.{Table => CTable} +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.java.operators.join.JoinType +import org.apache.flink.api.table.{StreamTableEnvironment, TableEnvironment} +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.validate.ValidationException + +case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = projectList.map(_.toAttribute) + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +val afterResolve = super.resolveExpressions(tableEnv).asInstanceOf[Project] +val newProjectList = + afterResolve.projectList.zipWithIndex.map { case (e, i) => +e match { + case u @ UnresolvedAlias(child, optionalAliasName) => child match { +case ne: NamedExpression => ne +case e if !e.valid => u +case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name) +case other => Alias(other, optionalAliasName.getOrElse(s"_c$i")) + } + case _ => throw new IllegalArgumentException +} +} +Project(newProjectList, child) + } + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = { +val allAlias = projectList.forall(_.isInstanceOf[Alias]) +child.toRelNode(relBuilder) +if (allAlias) { + // Calcite's RelBuilder does not translate identity projects even if they rename fields. + // Add a projection ourselves (will be automatically removed by translation rules). + relBuilder.push( +LogicalProject.create(relBuilder.peek(), + projectList.map(_.toRexNode(relBuilder)).asJava, + projectList.map(_.name).asJava)) +} else { + relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*) +} + } +} + +case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = +throw new UnresolvedException("Invalid call to output on AliasNode") + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = +throw new UnresolvedException("Invalid call to toRelNode on AliasNode") + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +if (aliasList.length > child.output.length) { + failValidation("Aliasing more fields than we actually have") +} else if (!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) { + failValidation("`as` only allow string arguments") +} else { + val names = aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name) + val input = child.output + Project( +names.zip(input).map { case (name, attr) => + Alias(attr, name)} ++ input.drop(names.length), child) +} + } +} + +case class
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278195#comment-15278195 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62683621 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import scala.collection.JavaConverters._ + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataTypeFactory +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.schema.{Table => CTable} +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.java.operators.join.JoinType +import org.apache.flink.api.table.{StreamTableEnvironment, TableEnvironment} +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.validate.ValidationException + +case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = projectList.map(_.toAttribute) + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { --- End diff -- Can we move the check of `Table.checkUniqueNames` to `Project`? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278198#comment-15278198 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62683921 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import scala.collection.JavaConverters._ + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataTypeFactory +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.schema.{Table => CTable} +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.java.operators.join.JoinType +import org.apache.flink.api.table.{StreamTableEnvironment, TableEnvironment} +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.validate.ValidationException + +case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = projectList.map(_.toAttribute) + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +val afterResolve = super.resolveExpressions(tableEnv).asInstanceOf[Project] +val newProjectList = + afterResolve.projectList.zipWithIndex.map { case (e, i) => +e match { + case u @ UnresolvedAlias(child, optionalAliasName) => child match { +case ne: NamedExpression => ne +case e if !e.valid => u +case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name) +case other => Alias(other, optionalAliasName.getOrElse(s"_c$i")) + } + case _ => throw new IllegalArgumentException +} +} +Project(newProjectList, child) + } + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = { +val allAlias = projectList.forall(_.isInstanceOf[Alias]) +child.toRelNode(relBuilder) +if (allAlias) { + // Calcite's RelBuilder does not translate identity projects even if they rename fields. + // Add a projection ourselves (will be automatically removed by translation rules). + relBuilder.push( +LogicalProject.create(relBuilder.peek(), + projectList.map(_.toRexNode(relBuilder)).asJava, + projectList.map(_.name).asJava)) +} else { + relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*) +} + } +} + +case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = +throw new UnresolvedException("Invalid call to output on AliasNode") + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = +throw new UnresolvedException("Invalid call to toRelNode on AliasNode") + + override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +if (aliasList.length > child.output.length) { + failValidation("Aliasing more fields than we actually have") +} else if (!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) { + failValidation("`as` only allow string arguments") +} else { + val names = aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name) + val input = child.output + Project( +names.zip(input).map { case (name, attr) => + Alias(attr, name)} ++ input.drop(names.length), child) +} + } +} + +case class
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278181#comment-15278181 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62682155 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala --- @@ -20,27 +20,98 @@ package org.apache.flink.api.table.expressions import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder -case class UnresolvedFieldReference(override val name: String) extends LeafExpression { +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.validate.ExprValidationResult + +object NamedExpression { + private val curId = new java.util.concurrent.atomic.AtomicInteger() + def newExprId: Int = curId.getAndIncrement() +} + +trait NamedExpression extends Expression { + def name: String + def exprId: Int + def toAttribute: Attribute +} + +abstract class Attribute extends LeafExpression with NamedExpression { + override def toAttribute: Attribute = this + + def withName(newName: String): Attribute +} + +case class UnresolvedFieldReference(name: String) extends Attribute { + override def exprId: Int = throw new UnresolvedException(s"calling exprId on ${this.getClass}") + override def toString = "\"" + name + override def withName(newName: String): Attribute = UnresolvedFieldReference(newName) + + override def dataType: TypeInformation[_] = +throw new UnresolvedException(s"calling dataType on ${this.getClass}") + + override def validateInput(): ExprValidationResult = +ExprValidationResult.ValidationFailure(s"Unresolved reference $name") +} + +case class ResolvedFieldReference( +name: String, +dataType: TypeInformation[_])( +val exprId: Int = NamedExpression.newExprId) extends Attribute { + + override def toString = s"'$name" + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { relBuilder.field(name) } -} -case class ResolvedFieldReference(override val name: String) extends LeafExpression { - override def toString = s"'$name" + override def withName(newName: String): Attribute = { +if (newName == name) { + this +} else { + ResolvedFieldReference(newName, dataType)(exprId) +} + } } -case class Naming(child: Expression, override val name: String) extends UnaryExpression { +case class Alias(child: Expression, name: String) +extends UnaryExpression with NamedExpression { + val exprId: Int = NamedExpression.newExprId + override def toString = s"$child as '$name" override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { relBuilder.alias(child.toRexNode, name) } - override def makeCopy(anyRefs: Seq[AnyRef]): this.type = { + override def dataType: TypeInformation[_] = child.dataType + + override def makeCopy(anyRefs: Array[AnyRef]): this.type = { val child: Expression = anyRefs.head.asInstanceOf[Expression] copy(child, name).asInstanceOf[this.type] } + + override def toAttribute: Attribute = { +if (valid) { + ResolvedFieldReference(name, child.dataType)(exprId) +} else { + UnresolvedFieldReference(name) +} + } +} + +case class UnresolvedAlias( +child: Expression, +aliasName: Option[String] = None) extends UnaryExpression with NamedExpression { + + override def name: String = +throw new UnresolvedException("Invalid call to name on UnresolvedAlias") + override def toAttribute: Attribute = --- End diff -- can you add a new lines here? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278187#comment-15278187 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62682986 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.trees.TreeNode +import org.apache.flink.api.table.validate._ + +/** + * LogicalNode is created and validated as we construct query plan using Table API. + * + * The main validation procedure is separated into two phases: + * Expressions' resolution and transformation (#resolveExpressions(TableEnvironment)): + * + * translate UnresolvedFieldReference into ResolvedFieldReference + * using child operator's output + * translate Call(UnresolvedFunction) into solid Expression + * generate alias names for query output + * + * + * + * LogicalNode validation (#validate(TableEnvironment)): + * + * check no UnresolvedFieldReference exists any more + * check if all expressions have children of needed type + * check each logical operator have desired input + * + * Once we pass the validation phase, we can safely convert LogicalNode into Calcite's RelNode. + * + * Note: this is adapted from Apache Spark's LogicalPlan. + */ +abstract class LogicalNode extends TreeNode[LogicalNode] { + def output: Seq[Attribute] + + def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +// resolve references and function calls +transformExpressionsUp { + case u @ UnresolvedFieldReference(name) => +resolveChildren(name).getOrElse(u) + case c @ Call(name, children) if c.childrenValid => +tableEnv.getFunctionCatalog.lookupFunction(name, children) +} + } + + def toRelNode(relBuilder: RelBuilder): RelBuilder --- End diff -- can we rename this method to `getRelBuilder` and make it protected (only accessible for subclasses of `LogicalNode`) and add a method `toRelNode(relBuilder: RelBuilder): RelNode` that actually returns a `RelNode`? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278172#comment-15278172 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62681359 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala --- @@ -36,41 +39,59 @@ abstract sealed class Aggregation extends UnaryExpression { self: Product => } case class Sum(child: Expression) extends Aggregation { - override def toString = s"($child).sum" + override def toString = s"sum($child)" override def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { relBuilder.aggregateCall(SqlStdOperatorTable.SUM, false, null, name, child.toRexNode) } + + override def dataType = child.dataType + + override def validateInput = TypeCheckUtils.assertNumericExpr(child.dataType, "sum") } case class Min(child: Expression) extends Aggregation { - override def toString = s"($child).min" + override def toString = s"min($child)" override def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { relBuilder.aggregateCall(SqlStdOperatorTable.MIN, false, null, name, child.toRexNode) } + + override def dataType = child.dataType + + override def validateInput = TypeCheckUtils.assertOrderableExpr(child.dataType, "min") } case class Max(child: Expression) extends Aggregation { - override def toString = s"($child).max" + override def toString = s"max($child)" override def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { relBuilder.aggregateCall(SqlStdOperatorTable.MAX, false, null, name, child.toRexNode) } + + override def dataType = child.dataType + + override def validateInput = TypeCheckUtils.assertOrderableExpr(child.dataType, "max") } case class Count(child: Expression) extends Aggregation { - override def toString = s"($child).count" + override def toString = s"count($child)" override def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { relBuilder.aggregateCall(SqlStdOperatorTable.COUNT, false, null, name, child.toRexNode) } + + override def dataType = BasicTypeInfo.LONG_TYPE_INFO } case class Avg(child: Expression) extends Aggregation { - override def toString = s"($child).avg" + override def toString = s"avg($child)" override def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { relBuilder.aggregateCall(SqlStdOperatorTable.AVG, false, null, name, child.toRexNode) } + + override def dataType = BasicTypeInfo.DOUBLE_TYPE_INFO --- End diff -- Our current implementation returns avg aggregates as the input data type. I think this should be changed to `child.dataType`. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278174#comment-15278174 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62681558 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala --- @@ -25,15 +25,29 @@ import org.apache.calcite.sql.SqlOperator import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder -import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo, TypeInformation} +import org.apache.flink.api.table.typeutils.{TypeCheckUtils, TypeConverter} +import org.apache.flink.api.table.validate.ExprValidationResult -abstract class BinaryArithmetic extends BinaryExpression { self: Product => +abstract class BinaryArithmetic extends BinaryExpression { def sqlOperator: SqlOperator override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { relBuilder.call(sqlOperator, children.map(_.toRexNode)) } + + override def dataType = left.dataType + + // TODO: tighten this rule once we implemented type coercion rules during validation + override def validateInput(): ExprValidationResult = { +if (!left.dataType.isInstanceOf[NumericTypeInfo[_]] || + !right.dataType.isInstanceOf[NumericTypeInfo[_]]) { + ExprValidationResult.ValidationFailure(s"$this require both operand Numeric, get" + --- End diff -- Please change to `"$this requireS both operandS to be Numeric" > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278188#comment-15278188 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62683029 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.trees.TreeNode +import org.apache.flink.api.table.validate._ + +/** + * LogicalNode is created and validated as we construct query plan using Table API. + * + * The main validation procedure is separated into two phases: + * Expressions' resolution and transformation (#resolveExpressions(TableEnvironment)): + * + * translate UnresolvedFieldReference into ResolvedFieldReference + * using child operator's output + * translate Call(UnresolvedFunction) into solid Expression + * generate alias names for query output + * + * + * + * LogicalNode validation (#validate(TableEnvironment)): + * + * check no UnresolvedFieldReference exists any more + * check if all expressions have children of needed type + * check each logical operator have desired input + * + * Once we pass the validation phase, we can safely convert LogicalNode into Calcite's RelNode. + * + * Note: this is adapted from Apache Spark's LogicalPlan. + */ +abstract class LogicalNode extends TreeNode[LogicalNode] { + def output: Seq[Attribute] + + def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = { +// resolve references and function calls +transformExpressionsUp { + case u @ UnresolvedFieldReference(name) => +resolveChildren(name).getOrElse(u) + case c @ Call(name, children) if c.childrenValid => +tableEnv.getFunctionCatalog.lookupFunction(name, children) +} + } + + def toRelNode(relBuilder: RelBuilder): RelBuilder + + def validate(tableEnv: TableEnvironment): LogicalNode = { +val resolvedNode = resolveExpressions(tableEnv) +resolvedNode.transformExpressionsUp { + case a: Attribute if !a.valid => +val from = children.flatMap(_.output).map(_.name).mkString(", ") +failValidation(s"cannot resolve [${a.name}] given input [$from]") + + case e: Expression if e.validateInput().isFailure => +e.validateInput() match { + case ExprValidationResult.ValidationFailure(message) => +failValidation(s"Expression $e failed on input check: $message") +} +} + } + + /** +* Resolves the given strings to a [[NamedExpression]] using the input from all child +* nodes of this LogicalPlan. +*/ + def resolveChildren(name: String): Option[NamedExpression] = +resolve(name, children.flatMap(_.output)) + + /** +* Performs attribute resolution given a name and a sequence of possible attributes. +*/ + def resolve(name: String, input: Seq[Attribute]): Option[NamedExpression] = { +// find all matches in input +val candidates = input.filter(_.name.equalsIgnoreCase(name)) +if (candidates.length > 1) { + failValidation(s"Reference $name is ambiguous") +} else if (candidates.length == 0) { + None +} else { + Some(candidates.head.withName(name)) +} + } + + def expressions: Seq[Expression] = { --- End diff -- I think this code is not used. > Add a validation phase before construct
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278185#comment-15278185 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62682619 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.typeutils.TypeCheckUtils +import org.apache.flink.api.table.validate.ExprValidationResult + +case class Abs(child: Expression) extends UnaryExpression { + override def dataType: TypeInformation[_] = child.dataType + + override def validateInput(): ExprValidationResult = +TypeCheckUtils.assertNumericExpr(child.dataType, "Abs") + + override def toString(): String = s"abs($child)" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { +relBuilder.call(SqlStdOperatorTable.ABS, child.toRexNode) + } +} + +case class Ceil(child: Expression) extends UnaryExpression { + override def dataType: TypeInformation[_] = LONG_TYPE_INFO + + override def validateInput(): ExprValidationResult = +TypeCheckUtils.assertNumericExpr(child.dataType, "Ceil") + + override def toString(): String = s"ceil($child)" + + override def toRexNode(implicit relBuilder: RelBuilder): RexNode = { +relBuilder.call(SqlStdOperatorTable.CEIL, child.toRexNode) + } +} + +case class Exp(child: Expression) extends UnaryExpression { + override def dataType: TypeInformation[_] = DOUBLE_TYPE_INFO + + // TODO: this could be loosened by enabling implicit cast + override def validateInput(): ExprValidationResult = { +if (child.dataType == DOUBLE_TYPE_INFO) { --- End diff -- Why do `Exp`, `Log10`, `Ln`, and `Power` require `Double` types? Shouldn't all numeric types be supported for these functions? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278183#comment-15278183 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62682327 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala --- @@ -21,25 +21,47 @@ import org.apache.calcite.rex.RexNode import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder -abstract class BinaryPredicate extends BinaryExpression { self: Product => } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.table.validate.ExprValidationResult + +abstract class BinaryPredicate extends BinaryExpression { + override def dataType = BasicTypeInfo.BOOLEAN_TYPE_INFO + + override def validateInput(): ExprValidationResult = { +if (left.dataType == BasicTypeInfo.BOOLEAN_TYPE_INFO && +right.dataType == BasicTypeInfo.BOOLEAN_TYPE_INFO) { + ExprValidationResult.ValidationSuccess +} else { + ExprValidationResult.ValidationFailure(s"$this only accept child of Boolean Type, " + --- End diff -- Please change to `"$this only accepts children of Boolean type"` > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278160#comment-15278160 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62680102 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala --- @@ -197,6 +209,17 @@ abstract class TableEnvironment(val config: TableConfig) { planner } + /** +* Returns the Calcite [[org.apache.calcite.rel.`type`.RelDataTypeFactory]] +* of this TableEnvironment. */ + private[flink] def getTypeFactory: RelDataTypeFactory = { --- End diff -- `getTypeFactory()` and `getTable()` are only called to create a `CatalogNode` which internally obtains the `rowType: RelDataType` of the table. I think we can simply return the `rowType` for a given table name and remove `getTypeFactory()` and `getTable()` from this class. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278169#comment-15278169 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62681024 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala --- @@ -17,13 +17,34 @@ */ package org.apache.flink.api.table.expressions -import java.util.concurrent.atomic.AtomicInteger - import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder -abstract class Expression extends TreeNode[Expression] { self: Product => - def name: String = Expression.freshName("expression") +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.trees.TreeNode +import org.apache.flink.api.table.validate.ExprValidationResult + +abstract class Expression extends TreeNode[Expression] { + /** +* Returns the [[TypeInformation]] for evaluating this expression. +* It is sometimes not available until the expression is valid. +*/ + def dataType: TypeInformation[_] --- End diff -- can we rename this field to `resultType`? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278164#comment-15278164 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62680699 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala --- @@ -152,13 +160,13 @@ abstract class TableEnvironment(val config: TableConfig) { * * @param name The name under which the table is registered. * @param table The table to register in the catalog -* @throws TableException if another table is registered under the provided name. +* @throws ValidationException if another table is registered under the provided name. */ - @throws[TableException] + @throws[ValidationException] protected def registerTableInternal(name: String, table: AbstractTable): Unit = { if (isRegistered(name)) { - throw new TableException(s"Table \'$name\' already exists. " + + throw new ValidationException(s"Table \'$name\' already exists. " + --- End diff -- I think we can keep `TableException` here. I would use `ValidationException` for all invalid expressions within Table API queries. `TableException` can be used for invalid use of the Table API in general (registering tables under existing names, ...) + current limitations (DataTypes, non-equality joins, etc.). Do you think such a separation would make sense? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15278154#comment-15278154 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1958#discussion_r62679391 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala --- @@ -92,8 +93,7 @@ abstract class StreamTableEnvironment( def ingest(tableName: String): Table = { if (isRegistered(tableName)) { - relBuilder.scan(tableName) - new Table(relBuilder.build(), this) + new Table(this, CatalogNode(tableName, getTable(tableName), getTypeFactory)) } else { throw new TableException(s"Table \'$tableName\' was not found in the registry.") --- End diff -- Throw a `ValidationException` > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15276348#comment-15276348 ] ASF GitHub Bot commented on FLINK-3754: --- Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-217862197 Yes, I will also take a look at it. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15273989#comment-15273989 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-217430461 I took a brief look at the PR. The overall structure of the validation looks good to me. Will do a more in-depth review in the next days. @twalthr, can you have another look as well? Thanks, Fabian > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15268580#comment-15268580 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-216506763 That's a known issue, see FLINK-3860. No need to worry about this PR. I'll have a look soon, thanks for the update! > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15268572#comment-15268572 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-216505284 Test failure due to irrelevant test: ``` [INFO] flink-table SUCCESS [08:03 min] [INFO] flink-connector-wikiedits .. FAILURE [02:02 min] ``` ``` Running org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSourceTest Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 120.051 sec <<< FAILURE! - in org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSourceTest testWikipediaEditsSource(org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSourceTest) Time elapsed: 120.022 sec <<< ERROR! java.lang.Exception: test timed out after 12 milliseconds ``` > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15268560#comment-15268560 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-216502690 @zentol OK, I've closed it. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15268552#comment-15268552 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen closed the pull request at: https://github.com/apache/flink/pull/1916 > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15268551#comment-15268551 ] ASF GitHub Bot commented on FLINK-3754: --- Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-216501884 Since this PR is a substitute, could you close the old one? Thanks. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15268465#comment-15268465 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on the pull request: https://github.com/apache/flink/pull/1958#issuecomment-216487312 This PR substitute #1916 by squashing several previous commits into single one for easier rebase. @fhueske I've implemented eager validation here, can you take a look at this one? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15268456#comment-15268456 ] ASF GitHub Bot commented on FLINK-3754: --- GitHub user yjshen opened a pull request: https://github.com/apache/flink/pull/1958 [FLINK-3754][Table]Add a validation phase before construct RelNode using TableAPI This PR aims at adding an extra phase of **validation** for plans generated from Table API, matches the functionality of Calcite's Validator that are called during we execute an query expressed in SQL String. In order to do this, I inserted a new layer between TableAPI and `RelNode` construction: The `Logical Plan`. And the main procedure of validation work as follows: 1. Constructing a logical plan node 2. Do resolution using schema and `FunctionCatalog` 3. Do validation on the type annotated logical plan node After we finish the validation successfully, it's safe to construct `RelNode`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/yjshen/flink eager_validation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1958.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1958 commit ec6bf418e065d836f5399275d4ab24b9c29ab0fe Author: Yijie ShenDate: 2016-04-13T08:46:58Z Add an extra validation phase before construct RelNode. Squash previous commits into single one for easier rebase. The eight previous commits are: make TreeNode extends Product wip expressions validation, should create expressions for functions next add functions for math and string wip move table api on logicalNode resolve and validate next wip fix bug in validator, merge eval, add doc resolve comments commit d31a782475903b16f57fee819fc1ddb830aaa597 Author: Yijie Shen Date: 2016-05-03T08:57:02Z do eager validation > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15266825#comment-15266825 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1916#issuecomment-216270686 Great, thanks! > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15262090#comment-15262090 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1916#issuecomment-215414605 If we decide for eager validation, we can also simplify the code a bit, right? I might be wrong but we could remove `PlanPreparation` and the recursive validation of `LogicalNode` since the inputs of an operator would have been already validated. The checks of `Validator.validate()` could be moved to the respective `LogicalNode`. Not sure if we would still need the `RuleExecutor`. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15261685#comment-15261685 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on the pull request: https://github.com/apache/flink/pull/1916#issuecomment-215336755 Hi @fhueske @twalthr, thanks for the detailed review work! I've left some comment above to express my opinion, looking forward to more discussions on this :) > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15261661#comment-15261661 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1916#discussion_r61382801 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala --- @@ -355,4 +380,26 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + /** +* The primary workflow for executing plan validation for that generated from Table API. +* The validation is intentionally designed as a lazy procedure and triggered when we +* are going to run on Flink core. +*/ + class PlanPreparation(val env: TableEnvironment, val logical: LogicalNode) { + +lazy val resolvedPlan: LogicalNode = env.getValidator.resolve(logical) + +def validate(): Unit = env.getValidator.validate(resolvedPlan) + +lazy val relNode: RelNode = { + env match { +case _: BatchTableEnvironment => --- End diff -- Yes, it shouldn't be distinguished. The difference should exist inside validator's rule set. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15261658#comment-15261658 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on the pull request: https://github.com/apache/flink/pull/1916#issuecomment-215332571 > I think it would be good to eagerly check each method call of the Table API. This would make debugging easier, because exceptions would be thrown where the error is caused. Please correct me if I am wrong, but I think we would not lose validation coverage compared to the coverage this PR if we do eager validation? It might also be easier, because we do not need the recursive operator traversal (still the expression traversal though). Maybe we can even directly translate to RelNodes after validation, just like we do right now. I think a lot of this PR could be used for eager validation, not sure if it would be easily possible with the SqlNode validation approach. Regarding the eager validation you mentioned, I think that could be accomplished by calling `validate()` each time I am constructing another `Table`, in other words, each time I am calling a `Table api`, changing the current code from: ``` scala class Table( private[flink] val tableEnv: TableEnvironment, private[flink] val planPreparation: PlanPreparation) { def this(tableEnv: TableEnvironment, logicalPlan: LogicalNode) = { this(tableEnv, new PlanPreparation(tableEnv, logicalPlan)) } ``` to ``` scala def this(tableEnv: TableEnvironment, logicalPlan: LogicalNode) = { this(tableEnv, { val pp = new PlanPreparation(tableEnv, logicalPlan) pp.validate() pp }) ``` and also add an additional flag annotate the logical node as `validate` and therefore avoid the `recursive logical plan traversal` would be enough. Do I understand your idea correctly? On the other hand, I prefer to postponed `RelNode` construction until we are going to run the query on Flink, since we only need `RelNode` at that time. > While reviewing the PR, I noticed that some classes seem to be partially derived from Spark's code base (e.g., TreeNode and RuleExecutor). I know there are some common patterns that apply in optimizers, but it is good style to give credit to the original source code. Can you list which classes are derived from Spark code and add a comment to them pointing to the source of the code? OK, I will do a more detailed scan and give credit to all the original source code. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15261609#comment-15261609 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1916#discussion_r61378731 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala --- @@ -215,7 +215,7 @@ class ScalarFunctionsTest { } - @Test + @Ignore --- End diff -- `exp` `log` `pow` and `ln` should have `Double` as input. What I was thinking is, we should add some extra `type coercion` rules and add an `cast` when we can do it safely(when an expression is asking a `Double` but we provide a `Int`), for example, `Byte` to `Double`, `Long` to `Double` and so on, and rewrite the expression as `pow(cast(x, Double))` > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15260456#comment-15260456 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1916#issuecomment-215142967 Hi @yjshen, thanks for your patience. I also finished a first pass over the PR. I'd like to propose a third alternative, in addition to the custom validation phase (this PR) and generating `SqlNode`s and using Calcite's validator. Both approaches would mean that the validation happens before the logical plan is translated into a `RelNode`. I think it would be good to eagerly check each method call of the Table API. This would make debugging easier, because exceptions would be thrown where the error is caused. Please correct me if I am wrong, but I think we would not lose validation coverage compared to the coverage this PR if we do eager validation? It might also be easier, because we do not need the recursive operator traversal (still the expression traversal though). Maybe we can even directly translate to `RelNode`s after validation, just like we do right now. I think a lot of this PR could be used for eager validation, not sure if it would be easily possible with the `SqlNode` validation approach. What do you think about eagerly validation, @yjshen and @twalthr? While reviewing the PR, I noticed that some classes seem to be partially derived from Spark's code base (e.g., `TreeNode` and `RuleExecutor`). I know there are some common patterns that apply in optimizers, but it is good style to give credit to the original source code. Can you list which classes are derived from Spark code and add a comment to them pointing to the source of the code? Thanks, Fabian > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15260447#comment-15260447 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1916#discussion_r61291959 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala --- @@ -215,7 +215,7 @@ class ScalarFunctionsTest { } - @Test + @Ignore --- End diff -- Why did you exclude these tests? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15260442#comment-15260442 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1916#discussion_r61291865 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala --- @@ -17,13 +17,34 @@ */ package org.apache.flink.api.table.expressions -import java.util.concurrent.atomic.AtomicInteger - import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder -abstract class Expression extends TreeNode[Expression] { self: Product => - def name: String = Expression.freshName("expression") +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.trees.TreeNode +import org.apache.flink.api.table.validate.ExprValidationResult + +abstract class Expression extends TreeNode[Expression] { + /** +* Returns the [[TypeInformation]] for evaluating this expression. +* It is sometimes available until the expression is valid. +*/ + def dataType: TypeInformation[_] + + /** +* One pass validation of the expression tree in post order. +*/ + lazy val valid: Boolean = childrenValid && validateInput().isSuccess + + def childrenValid: Boolean = children.forall(_.valid) + + /** +* Check input data types, inputs number or other properties specified by this expression. +* Return `ValidationSuccess` if it pass the check, +* or `ValidationFailure` with supplement message explaining the error. +* Note: we should only call this method until `childrenValidated == true` --- End diff -- `childrenValidated` -> `childrenValid`? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15260439#comment-15260439 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1916#discussion_r61291698 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala --- @@ -17,13 +17,34 @@ */ package org.apache.flink.api.table.expressions -import java.util.concurrent.atomic.AtomicInteger - import org.apache.calcite.rex.RexNode import org.apache.calcite.tools.RelBuilder -abstract class Expression extends TreeNode[Expression] { self: Product => - def name: String = Expression.freshName("expression") +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.trees.TreeNode +import org.apache.flink.api.table.validate.ExprValidationResult + +abstract class Expression extends TreeNode[Expression] { + /** +* Returns the [[TypeInformation]] for evaluating this expression. +* It is sometimes available until the expression is valid. --- End diff -- +not ? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15260438#comment-15260438 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1916#discussion_r61291621 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala --- @@ -355,4 +380,26 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + /** +* The primary workflow for executing plan validation for that generated from Table API. +* The validation is intentionally designed as a lazy procedure and triggered when we +* are going to run on Flink core. +*/ + class PlanPreparation(val env: TableEnvironment, val logical: LogicalNode) { + +lazy val resolvedPlan: LogicalNode = env.getValidator.resolve(logical) + +def validate(): Unit = env.getValidator.validate(resolvedPlan) + +lazy val relNode: RelNode = { + env match { +case _: BatchTableEnvironment => --- End diff -- Why do you distinguish here? It's the same code, no? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15260016#comment-15260016 ] ASF GitHub Bot commented on FLINK-3754: --- Github user twalthr commented on the pull request: https://github.com/apache/flink/pull/1916#issuecomment-215061267 @yjshen I looked through the code changes. I was quite impressed that your PR touches nearly every class of the current API. Basically your changes seem to work, however, I'm not sure if we want to implement the validation phase for every scalar function ourselves. Actually, Calcite already comes with type inference, type checking and validation capabilities. I don't know if we want to reinvent the wheel at this point. Your approach inserts a layer under the Table API for doing the validation. However, instead, this layer could also translate the plan into a SQL tree (on top of RelNodes). We could then let Calcite do the work of validation. This could also solve another problem that I faced when working on FLINK-3580. If you take a look at `StandardConvertletTable` of Calcite, you see that Calcite also does some conversions which we also need to implement ourselves if we do not base the Table API on top of SQL. We need to discuss how we want to proceed. Both solution are not perfect. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254544#comment-15254544 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1916#issuecomment-213567820 Thanks for the additional information @yjshen! I'm a bit behind with PR reviews, but will definitely have a look begin of next week. Thanks, Fabian > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253874#comment-15253874 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on the pull request: https://github.com/apache/flink/pull/1916#issuecomment-213419043 The type annotation work is done from bottom to top: Firstly, we know each schema of the two input, and we know `List[] expression` in `Project` are used to manipulate one row of table data as input and output one value per expression, therefore, we can infer the the output schema of `Project` (in the current impl this was expressed as: `def output: Seq[Attribute]`) if we know each expressions `dataType`. For example, `Add`'s dataType is same as it's input, `Or`'s dataType is always `Boolean`, `pow(a, b)`'s dataType is always `Double`, however, if and only if we understand all kinds of expressions, we are able to infer its `dataType`. The main problems here is we only have `Call`(Unresolved Function) generated during expression construction, therefore, we should resolve them first into solid `Expression`s. `FunctionCatalog` is introduced here for a mapping from `FunctionName -> Expression`, we can easily finish the translation work as we look up `catalog`. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253850#comment-15253850 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on the pull request: https://github.com/apache/flink/pull/1916#issuecomment-213411425 For ease of review, I would like to explain this RP with more details. While table api are called to construct a query, it was first constructed as a operator tree, `LogicalNode` is used to express the tree node. Therefore, after we finished constructing a query and about to translate it into a dataset/datastream program, the constructed logical plan looks like: ``` Aggregate(List of group by, List of aggregate) +- Filter (condition expression) +- Union +- Project (select column expression) +- TableScan(DS1) +- Project (select column expression) +- TableScan(DS2) ``` At this time, only the leaf node: TableScan (`CatalogNode` in current implementation) is equipped with type information, in order to do plan validation, we need to **annotate** the full logical plan with type information first and use type information to do **validate** works. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253808#comment-15253808 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1916#discussion_r60728611 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.logical + +import scala.collection.JavaConverters._ + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataTypeFactory +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.schema.{Table => CTable} +import org.apache.calcite.tools.RelBuilder + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.operators.join.JoinType +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.typeutils.TypeConverter +import org.apache.flink.api.table.validate.ValidationException + +case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = projectList.map(_.toAttribute) + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = { +def allAlias: Boolean = { + projectList.forall { proj => +proj match { + case Alias(r: ResolvedFieldReference, name) => true + case _ => false +} + } +} +child.toRelNode(relBuilder) +if (allAlias) { + relBuilder.push( +LogicalProject.create(relBuilder.peek(), + projectList.map(_.toRexNode(relBuilder)).asJava, + projectList.map(_.name).asJava)) +} else { + relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*) +} + } +} + +case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = +throw new UnresolvedException("Invalid call to output on AliasNode") + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = +throw new UnresolvedException("Invalid call to toRelNode on AliasNode") + + override lazy val resolved: Boolean = false +} + +case class Distinct(child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = child.output + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = { +child.toRelNode(relBuilder) +relBuilder.distinct() + } +} + +case class Filter(condition: Expression, child: LogicalNode) extends UnaryNode { + override def output: Seq[Attribute] = child.output + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = { +child.toRelNode(relBuilder) +relBuilder.filter(condition.toRexNode(relBuilder)) + } +} + +case class Aggregate( +groupingExpressions: Seq[Expression], +aggregateExpressions: Seq[NamedExpression], +child: LogicalNode) extends UnaryNode { + + override def output: Seq[Attribute] = { +(groupingExpressions ++ aggregateExpressions) map { agg => + agg match { +case ne: NamedExpression => ne.toAttribute +case e => Alias(e, e.toString).toAttribute + } +} + } + + override def toRelNode(relBuilder: RelBuilder): RelBuilder = { +child.toRelNode(relBuilder) +relBuilder.aggregate( + relBuilder.groupKey(groupingExpressions.map(_.toRexNode(relBuilder)).asJava), + aggregateExpressions.filter(_.isInstanceOf[Alias]).map { e => +e match { + case Alias(agg: Aggregation, name) => agg.toAggCall(name)(relBuilder) + case
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253806#comment-15253806 ] ASF GitHub Bot commented on FLINK-3754: --- Github user yjshen commented on a diff in the pull request: https://github.com/apache/flink/pull/1916#discussion_r60728312 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -199,12 +151,12 @@ class Table( * tab.filter('name === "Fred") * }}} */ - def filter(predicate: Expression): Table = { - -relBuilder.push(relNode) -relBuilder.filter(predicate.toRexNode(relBuilder)) - -new Table(relBuilder.build(), tableEnv) + def filter(predicate: Expression): Table = withPlan { +//logicalPlan match { +// case j: Join => j.copy(condition = Some(predicate)) +// case o => Filter(predicate, logicalPlan) +//} --- End diff -- Will remove this the next commit. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15250128#comment-15250128 ] ASF GitHub Bot commented on FLINK-3754: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1916#issuecomment-212485745 Thanks @yjshen for working on this issue! Unified validation and exceptions are a big improvement, IMO. I'll try to have a look soon. @twalthr, can you have a look at the changes done on the expressions? > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15250009#comment-15250009 ] ASF GitHub Bot commented on FLINK-3754: --- GitHub user yjshen opened a pull request: https://github.com/apache/flink/pull/1916 [FLINK-3754][Table]Add a validation phase before construct RelNode using TableAPI This PR aims at adding an extra phase of **validation** for plans generated from Table API, matches the functionality of Calcite's Validator that are called during we execute an query expressed in SQL String. In order to do this, I inserted a new layer between TableAPI and `RelNode` construction: The `Logical Plan`. And the main procedure of validation and RelNode construction work as follows: 1. Constructing logical plan in flight 2. When we are about to execute the plan: 3. Do resolution using table/dataset/datastream schema 4. Do validation on the type annotated logical plan tree 5. Translate logical plan into `RelNode` once we successfully finished validation. You can merge this pull request into a Git repository by running: $ git pull https://github.com/yjshen/flink extra_validation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1916.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1916 commit 7fb102af1fa52dab2f0c80785f75bf7e0d8a7062 Author: Yijie ShenDate: 2016-04-13T08:46:58Z make TreeNode extends Product commit ab75d4857cb203714ee037aea2e55de776c4dd32 Author: Yijie Shen Date: 2016-04-15T14:51:20Z wip expressions validation, should create expressions for functions next commit 61e4bb09d754fe0aca7624e3fcd70d364e4154d3 Author: Yijie Shen Date: 2016-04-16T07:02:07Z add functions for math and string commit 6abbfad0a11a3874150baa4dcabf6caab37cf0be Author: Yijie Shen Date: 2016-04-18T16:49:04Z wip move table api on logicalNode commit 64ecdbef273895e4527fb6b5120d92acb0d20542 Author: Yijie Shen Date: 2016-04-19T05:20:47Z resolve and validate next commit dc42a44cc47cdb619076074eda95b84157554337 Author: Yijie Shen Date: 2016-04-19T19:26:58Z wip commit c04292abe6484e095171a66c399c16e50e98870a Author: Yijie Shen Date: 2016-04-20T12:26:25Z fix bug in validator, merge eval, add doc > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3754) Add a validation phase before construct RelNode using TableAPI
[ https://issues.apache.org/jira/browse/FLINK-3754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15241166#comment-15241166 ] Fabian Hueske commented on FLINK-3754: -- Thanks [~yijieshen] for working on this issue. It would be really good to have the validation a bit more centralized. Unfortunately, Calcite's RelBuilder does not perform validation such as type checks. Some of these could Flink do earlier than now. A good starting point should be the test cases that check for incorrect usage of the Table API. > Add a validation phase before construct RelNode using TableAPI > -- > > Key: FLINK-3754 > URL: https://issues.apache.org/jira/browse/FLINK-3754 > Project: Flink > Issue Type: Improvement > Components: Table API >Affects Versions: 1.0.0 >Reporter: Yijie Shen >Assignee: Yijie Shen > > Unlike sql string's execution, which have a separate validation phase before > RelNode construction, Table API lacks the counterparts and the validation is > scattered in many places. > I suggest to add a single validation phase and detect problems as early as > possible. -- This message was sent by Atlassian JIRA (v6.3.4#6332)