[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-17 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-219902602
  
@fhueske @twalthr thank you for your reviews and detailed comments through 
this issue 👍 . I'm quite excited to have it merged :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1958


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-17 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-219812166
  
Merging this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-17 Thread twalthr
Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-219650388
  
@fhueske I looked through the changes.
You have my +1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-13 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-219085233
  
Hi @yjshen, thanks a lot for the update! This PR is good to merge, IMO. 
@twalthr, let me know if you want to have another look as well. Otherwise, 
I'll merge this PR in the next days. 
Very happy to have this improvement :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r63206959
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
 ---
@@ -74,6 +109,8 @@ case class IsNull(child: Expression) extends 
UnaryExpression {
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
 relBuilder.isNull(child.toRexNode)
   }
+
+  override def resultType = BOOLEAN_TYPE_INFO
--- End diff --

Oh yes, sure. You're right. I did not expand the code (Github had folded 
some lines) I thought this belonged to `GreaterThan`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-13 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r63180341
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
 ---
@@ -74,6 +109,8 @@ case class IsNull(child: Expression) extends 
UnaryExpression {
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
 relBuilder.isNull(child.toRexNode)
   }
+
+  override def resultType = BOOLEAN_TYPE_INFO
--- End diff --

`IsNull` accepts all type of columns and returns if the current cell is 
`null`, therefore, the `resultType` is `Boolean`. `IsNull` is extending 
`UnaryExpression` which does not have a default `resultType` implementation, so 
we need override.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r63177787
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
 ---
@@ -74,6 +109,8 @@ case class IsNull(child: Expression) extends 
UnaryExpression {
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
 relBuilder.isNull(child.toRexNode)
   }
+
+  override def resultType = BOOLEAN_TYPE_INFO
--- End diff --

I think we do not need to override here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-218511092
  
@fhueske @twalthr thanks for the review work! I've updated the code just 
now.
Look forward to more feedbacks on this :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread twalthr
Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-218475561
  
@yjshen great work! PR looks very good. I had only some minor refactoring 
comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-218477620
  
Hi @twalthr, thanks very much for the review work! I'll resolve your 
comments shortly :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62856490
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/UnresolvedException.scala
 ---
@@ -15,9 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.api.table
+package org.apache.flink.api.table.expressions
 
-/**
- * General Exception for all errors during table handling.
- */
-class TableException(msg: String) extends RuntimeException(msg)
+case class UnresolvedException(msg: String) extends RuntimeException(msg)
--- End diff --

OK, will do this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62855247
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
 ---
@@ -18,85 +18,26 @@
 package org.apache.flink.api.table.expressions
 
 import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.SqlOperator
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.table.validate.{ExprValidationResult, 
ValidationFailure}
 
 /**
   * General expression for unresolved function calls. The function can be 
a built-in
   * scalar function or a user-defined scalar function.
   */
-case class Call(functionName: String, args: Expression*) extends 
Expression {
+case class Call(functionName: String, args: Seq[Expression]) extends 
Expression {
--- End diff --

While I was resolving `Call` into solid expressions, `FunctionCatalog` is 
used and its `withChildren`  method is looking up constructors to create new 
`expression` as follows: 

1. look up expressions take `Seq[Expression]` as argument
2. if 1 is not fulfilled, look up the constructor match exactly the number 
of arguments, like the `MyFunc` example you provided.

Does this make sense?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62853599
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 ---
@@ -130,19 +130,17 @@ trait ImplicitExpressionOperations {
 * @param endIndex last character of the substring (starting at 1, 
inclusive)
 * @return substring
 */
-  def substring(beginIndex: Expression, endIndex: Expression) = {
-Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex, endIndex)
-  }
+  def substring(beginIndex: Expression, endIndex: Expression) =
+SubString(expr, beginIndex, endIndex)
 
   /**
 * Creates a substring of the given string beginning at the given index 
to the end.
 *
 * @param beginIndex first character of the substring (starting at 1, 
inclusive)
 * @return substring
 */
-  def substring(beginIndex: Expression) = {
-Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex)
-  }
+  def substring(beginIndex: Expression) =
+new SubString(expr, beginIndex)
--- End diff --

Since `SubString` is defined as `case class SubString(str: Expression, 
begin: Expression, end: Expression)` and so does the generated `apply` method, 
the `new` here cannot be removed, otherwise complaining of `cannot resolve 
subString with such signature` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62851933
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
 ---
@@ -18,85 +18,26 @@
 package org.apache.flink.api.table.expressions
 
 import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.SqlOperator
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.table.validate.{ExprValidationResult, 
ValidationFailure}
 
 /**
   * General expression for unresolved function calls. The function can be 
a built-in
   * scalar function or a user-defined scalar function.
   */
-case class Call(functionName: String, args: Expression*) extends 
Expression {
+case class Call(functionName: String, args: Seq[Expression]) extends 
Expression {
--- End diff --

The reason why I used var-args here was that it is easier to be used in the 
API once custom functions are possible: `.filter(Call("MYFUNC", 'f1, 'f2))`. 
But we can also find an other way later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62853748
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TypeCoercion.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+object TypeCoercion {
--- End diff --

You are right, will fix this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62852367
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
 ---
@@ -20,19 +20,55 @@ package org.apache.flink.api.table.expressions
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, 
TypeInformation}
 import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate._
 
-case class Cast(child: Expression, tpe: TypeInformation[_]) extends 
UnaryExpression {
+case class Cast(child: Expression, resultType: TypeInformation[_]) extends 
UnaryExpression {
 
-  override def toString = s"$child.cast($tpe)"
+  override def toString = s"$child.cast($resultType)"
 
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-relBuilder.cast(child.toRexNode, TypeConverter.typeInfoToSqlType(tpe))
+relBuilder.cast(child.toRexNode, 
TypeConverter.typeInfoToSqlType(resultType))
   }
 
-  override def makeCopy(anyRefs: Seq[AnyRef]): this.type = {
+  override def makeCopy(anyRefs: Array[AnyRef]): this.type = {
 val child: Expression = anyRefs.head.asInstanceOf[Expression]
-copy(child, tpe).asInstanceOf[this.type]
+copy(child, resultType).asInstanceOf[this.type]
+  }
+
+  override def validateInput(): ExprValidationResult = {
+if (Cast.canCast(child.resultType, resultType)) {
+  ValidationSuccess
+} else {
+  ValidationFailure(s"Unsupported cast from ${child.resultType} to 
$resultType")
+}
+  }
+}
+
+object Cast {
--- End diff --

I would also move this into `typeutils`. We'll may need it somewhere else 
too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62851129
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
 ---
@@ -18,85 +18,26 @@
 package org.apache.flink.api.table.expressions
 
 import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.SqlOperator
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.table.validate.{ExprValidationResult, 
ValidationFailure}
 
 /**
   * General expression for unresolved function calls. The function can be 
a built-in
   * scalar function or a user-defined scalar function.
   */
-case class Call(functionName: String, args: Expression*) extends 
Expression {
+case class Call(functionName: String, args: Seq[Expression]) extends 
Expression {
 
   override def children: Seq[Expression] = args
 
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-relBuilder.call(
-  BuiltInFunctionNames.toSqlOperator(functionName),
-  args.map(_.toRexNode): _*)
+throw new UnresolvedException(s"trying to convert UnresolvedFunction 
$functionName to RexNode")
   }
 
   override def toString = s"\\$functionName(${args.mkString(", ")})"
 
-  override def makeCopy(newArgs: Seq[AnyRef]): this.type = {
-val copy = Call(
-  newArgs.head.asInstanceOf[String],
-  newArgs.drop(1).asInstanceOf[Seq[Expression]]: _*)
+  override def resultType =
+throw new UnresolvedException(s"calling dataType on Unresolved 
Function $functionName")
--- End diff --

resultType instead of dataType. Space between Unresolved Function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62849325
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/UnresolvedException.scala
 ---
@@ -15,9 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.api.table
+package org.apache.flink.api.table.expressions
 
-/**
- * General Exception for all errors during table handling.
- */
-class TableException(msg: String) extends RuntimeException(msg)
+case class UnresolvedException(msg: String) extends RuntimeException(msg)
--- End diff --

Could you put this into `exceptions.scala` and document it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62848946
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/TypeCoercion.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+object TypeCoercion {
--- End diff --

I would put this into the `typeutils` package. The `expressions` package 
should only contain expressions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62843680
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 ---
@@ -130,19 +130,17 @@ trait ImplicitExpressionOperations {
 * @param endIndex last character of the substring (starting at 1, 
inclusive)
 * @return substring
 */
-  def substring(beginIndex: Expression, endIndex: Expression) = {
-Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex, endIndex)
-  }
+  def substring(beginIndex: Expression, endIndex: Expression) =
+SubString(expr, beginIndex, endIndex)
 
   /**
 * Creates a substring of the given string beginning at the given index 
to the end.
 *
 * @param beginIndex first character of the substring (starting at 1, 
inclusive)
 * @return substring
 */
-  def substring(beginIndex: Expression) = {
-Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex)
-  }
+  def substring(beginIndex: Expression) =
+new SubString(expr, beginIndex)
--- End diff --

`new` can be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-11 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-218447948
  
Hi @fhueske , I've just finished my work, can you give a another pass of 
review? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62745064
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/ScalarFunctionsTest.scala
 ---
@@ -215,7 +215,7 @@ class ScalarFunctionsTest {
 
   }
 
-  @Test
+  @Ignore
--- End diff --

This will be enabled again once type coercion rules are implemented.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-218278581
  
Hi @fhueske, part of the comments are resolved and updated, two TODOs:

- [ ] Type coercion for expressions, for example:
  - `Add(int, long)` should be auto coerced to `Add(cast(long), long)`
  -  expressions expecting `double` as input but get `int` such as 
`ln(int)` should be auto cast to `ln (cast(double))`

- [ ] `TreeNode`, `FunctionCatalog` etc rewrite.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62742492
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
+val newProjectList =
+  afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+e match {
+  case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
+case ne: NamedExpression => ne
+case e if !e.valid => u
+case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
+case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
+  }
+  case _ => throw new IllegalArgumentException
+}
+}
+Project(newProjectList, child)
+  }
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+val allAlias = projectList.forall(_.isInstanceOf[Alias])
+child.toRelNode(relBuilder)
+if (allAlias) {
+  // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
+  //   Add a projection ourselves (will be automatically removed by 
translation rules).
+  relBuilder.push(
+LogicalProject.create(relBuilder.peek(),
+  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map(_.name).asJava))
+} else {
+  relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+}
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] =
+throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+if (aliasList.length > child.output.length) {
+  failValidation("Aliasing more fields than we actually have")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("`as` only allow string arguments")
+} else {
+  val names = 
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name)
+  val input = child.output
+  Project(
+names.zip(input).map { case (name, attr) =>
+  Alias(attr, name)} ++ input.drop(names.length), child)
+}
+  }
+}
+
+case class Distinct(child: LogicalNode) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+child.toRelNode(relBuilder)
+

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62742390
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
--- End diff --

Moved


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62731427
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
 ---
@@ -92,8 +93,7 @@ abstract class StreamTableEnvironment(
   def ingest(tableName: String): Table = {
 
 if (isRegistered(tableName)) {
-  relBuilder.scan(tableName)
-  new Table(relBuilder.build(), this)
+  new Table(this, CatalogNode(tableName, getTable(tableName), 
getTypeFactory))
 }
 else {
   throw new TableException(s"Table \'$tableName\' was not found in the 
registry.")
--- End diff --

I think a ValidationException would be more appropriate, because `ingest()` 
belongs to a query IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62715643
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
+val newProjectList =
+  afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+e match {
+  case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
+case ne: NamedExpression => ne
+case e if !e.valid => u
+case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
+case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
+  }
+  case _ => throw new IllegalArgumentException
+}
+}
+Project(newProjectList, child)
+  }
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+val allAlias = projectList.forall(_.isInstanceOf[Alias])
+child.toRelNode(relBuilder)
+if (allAlias) {
+  // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
+  //   Add a projection ourselves (will be automatically removed by 
translation rules).
+  relBuilder.push(
+LogicalProject.create(relBuilder.peek(),
+  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map(_.name).asJava))
+} else {
+  relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+}
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] =
+throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+if (aliasList.length > child.output.length) {
+  failValidation("Aliasing more fields than we actually have")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("`as` only allow string arguments")
--- End diff --

👍  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62715347
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
 ---
@@ -92,8 +93,7 @@ abstract class StreamTableEnvironment(
   def ingest(tableName: String): Table = {
 
 if (isRegistered(tableName)) {
-  relBuilder.scan(tableName)
-  new Table(relBuilder.build(), this)
+  new Table(this, CatalogNode(tableName, getTable(tableName), 
getTypeFactory))
 }
 else {
   throw new TableException(s"Table \'$tableName\' was not found in the 
registry.")
--- End diff --

As we discussed 
https://github.com/apache/flink/pull/1958#discussion_r62680699 , we should keep 
this TableException?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62714636
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
+val newProjectList =
+  afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+e match {
+  case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
+case ne: NamedExpression => ne
+case e if !e.valid => u
+case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
+case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
+  }
+  case _ => throw new IllegalArgumentException
+}
+}
+Project(newProjectList, child)
+  }
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+val allAlias = projectList.forall(_.isInstanceOf[Alias])
+child.toRelNode(relBuilder)
+if (allAlias) {
+  // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
+  //   Add a projection ourselves (will be automatically removed by 
translation rules).
+  relBuilder.push(
+LogicalProject.create(relBuilder.peek(),
+  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map(_.name).asJava))
+} else {
+  relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+}
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] =
+throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+if (aliasList.length > child.output.length) {
+  failValidation("Aliasing more fields than we actually have")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("`as` only allow string arguments")
+} else {
+  val names = 
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name)
+  val input = child.output
+  Project(
+names.zip(input).map { case (name, attr) =>
+  Alias(attr, name)} ++ input.drop(names.length), child)
+}
+  }
+}
+
+case class Distinct(child: LogicalNode) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+child.toRelNode(relBuilder)
+

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62711917
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate._
+
+/**
+  * LogicalNode is created and validated as we construct query plan using 
Table API.
+  *
+  * The main validation procedure is separated into two phases:
+  * Expressions' resolution and transformation 
(#resolveExpressions(TableEnvironment)):
+  * 
+  *   translate UnresolvedFieldReference into ResolvedFieldReference
+  * using child operator's output
+  *   translate Call(UnresolvedFunction) into solid Expression
+  *   generate alias names for query output
+  *   
+  * 
+  *
+  * LogicalNode validation (#validate(TableEnvironment)):
+  * 
+  *   check no UnresolvedFieldReference exists any more
+  *   check if all expressions have children of needed type
+  *   check each logical operator have desired input
+  * 
+  * Once we pass the validation phase, we can safely convert LogicalNode 
into Calcite's RelNode.
+  *
+  * Note: this is adapted from Apache Spark's LogicalPlan.
+  */
+abstract class LogicalNode extends TreeNode[LogicalNode] {
+  def output: Seq[Attribute]
+
+  def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
+// resolve references and function calls
+transformExpressionsUp {
+  case u @ UnresolvedFieldReference(name) =>
+resolveChildren(name).getOrElse(u)
+  case c @ Call(name, children) if c.childrenValid =>
+tableEnv.getFunctionCatalog.lookupFunction(name, children)
+}
+  }
+
+  def toRelNode(relBuilder: RelBuilder): RelBuilder
--- End diff --

jup. :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62709108
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate._
+
+/**
+  * LogicalNode is created and validated as we construct query plan using 
Table API.
+  *
+  * The main validation procedure is separated into two phases:
+  * Expressions' resolution and transformation 
(#resolveExpressions(TableEnvironment)):
+  * 
+  *   translate UnresolvedFieldReference into ResolvedFieldReference
+  * using child operator's output
+  *   translate Call(UnresolvedFunction) into solid Expression
+  *   generate alias names for query output
+  *   
+  * 
+  *
+  * LogicalNode validation (#validate(TableEnvironment)):
+  * 
+  *   check no UnresolvedFieldReference exists any more
+  *   check if all expressions have children of needed type
+  *   check each logical operator have desired input
+  * 
+  * Once we pass the validation phase, we can safely convert LogicalNode 
into Calcite's RelNode.
+  *
+  * Note: this is adapted from Apache Spark's LogicalPlan.
+  */
+abstract class LogicalNode extends TreeNode[LogicalNode] {
+  def output: Seq[Attribute]
+
+  def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
+// resolve references and function calls
+transformExpressionsUp {
+  case u @ UnresolvedFieldReference(name) =>
+resolveChildren(name).getOrElse(u)
+  case c @ Call(name, children) if c.childrenValid =>
+tableEnv.getFunctionCatalog.lookupFunction(name, children)
+}
+  }
+
+  def toRelNode(relBuilder: RelBuilder): RelBuilder
--- End diff --

You mean add a
```
final def toRelNode(relBuilder: RelBuilder): RelNode = relBuilder.build()
```
to get rid of `relBuilder.build()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62708060
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate._
+
+/**
+  * LogicalNode is created and validated as we construct query plan using 
Table API.
+  *
+  * The main validation procedure is separated into two phases:
+  * Expressions' resolution and transformation 
(#resolveExpressions(TableEnvironment)):
+  * 
+  *   translate UnresolvedFieldReference into ResolvedFieldReference
+  * using child operator's output
+  *   translate Call(UnresolvedFunction) into solid Expression
+  *   generate alias names for query output
+  *   
+  * 
+  *
+  * LogicalNode validation (#validate(TableEnvironment)):
+  * 
+  *   check no UnresolvedFieldReference exists any more
+  *   check if all expressions have children of needed type
+  *   check each logical operator have desired input
+  * 
+  * Once we pass the validation phase, we can safely convert LogicalNode 
into Calcite's RelNode.
+  *
+  * Note: this is adapted from Apache Spark's LogicalPlan.
+  */
+abstract class LogicalNode extends TreeNode[LogicalNode] {
+  def output: Seq[Attribute]
+
+  def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
+// resolve references and function calls
+transformExpressionsUp {
+  case u @ UnresolvedFieldReference(name) =>
+resolveChildren(name).getOrElse(u)
+  case c @ Call(name, children) if c.childrenValid =>
+tableEnv.getFunctionCatalog.lookupFunction(name, children)
+}
+  }
+
+  def toRelNode(relBuilder: RelBuilder): RelBuilder
--- End diff --

Yes, renaming the method would be good. I thought, we could also add a 
`toRelNode()` method which actually returns a `RelNode` in addition to the 
renamed method. The renamed method would then only be required for subclasses 
of `LogicalNode` and other classes would not need to call `build()` on the 
`RelBuilder` but receive a finalized `RelNode`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62705691
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate._
+
+/**
+  * LogicalNode is created and validated as we construct query plan using 
Table API.
+  *
+  * The main validation procedure is separated into two phases:
+  * Expressions' resolution and transformation 
(#resolveExpressions(TableEnvironment)):
+  * 
+  *   translate UnresolvedFieldReference into ResolvedFieldReference
+  * using child operator's output
+  *   translate Call(UnresolvedFunction) into solid Expression
+  *   generate alias names for query output
+  *   
+  * 
+  *
+  * LogicalNode validation (#validate(TableEnvironment)):
+  * 
+  *   check no UnresolvedFieldReference exists any more
+  *   check if all expressions have children of needed type
+  *   check each logical operator have desired input
+  * 
+  * Once we pass the validation phase, we can safely convert LogicalNode 
into Calcite's RelNode.
+  *
+  * Note: this is adapted from Apache Spark's LogicalPlan.
+  */
+abstract class LogicalNode extends TreeNode[LogicalNode] {
+  def output: Seq[Attribute]
+
+  def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
+// resolve references and function calls
+transformExpressionsUp {
+  case u @ UnresolvedFieldReference(name) =>
+resolveChildren(name).getOrElse(u)
+  case c @ Call(name, children) if c.childrenValid =>
+tableEnv.getFunctionCatalog.lookupFunction(name, children)
+}
+  }
+
+  def toRelNode(relBuilder: RelBuilder): RelBuilder
--- End diff --

`toRelNode` I am using here may not be a proper name, this function is 
manipulating the argument `relBuilder`'s inner stack using Calcite API:  
`relBuilder.join`, `relBuilder.distinct` etc, and returning the `relBuilder` 
with a changed stack, much like the `StringBuilder`'s `append` method. 
Maybe the I can just rename it to `construct` or `apply`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62703535
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -152,13 +160,13 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *
 * @param name The name under which the table is registered.
 * @param table The table to register in the catalog
-* @throws TableException if another table is registered under the 
provided name.
+* @throws ValidationException if another table is registered under the 
provided name.
 */
-  @throws[TableException]
+  @throws[ValidationException]
   protected def registerTableInternal(name: String, table: AbstractTable): 
Unit = {
 
 if (isRegistered(name)) {
-  throw new TableException(s"Table \'$name\' already exists. " +
+  throw new ValidationException(s"Table \'$name\' already exists. " +
--- End diff --

Yes, that's a good point


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62703065
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -152,13 +160,13 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *
 * @param name The name under which the table is registered.
 * @param table The table to register in the catalog
-* @throws TableException if another table is registered under the 
provided name.
+* @throws ValidationException if another table is registered under the 
provided name.
 */
-  @throws[TableException]
+  @throws[ValidationException]
   protected def registerTableInternal(name: String, table: AbstractTable): 
Unit = {
 
 if (isRegistered(name)) {
-  throw new TableException(s"Table \'$name\' already exists. " +
+  throw new ValidationException(s"Table \'$name\' already exists. " +
--- End diff --

It makes sense to me.
In this way, unsupported operations for StreamTableEnvironment should 
throws `TableException` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62685650
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/exceptions.scala
 ---
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.validate
--- End diff --

Can we move this to `org.apache.flink.api.table` and also add the 
`TableException` and `ExpressionParserException`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62680102
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -197,6 +209,17 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 planner
   }
 
+  /**
+* Returns the Calcite 
[[org.apache.calcite.rel.`type`.RelDataTypeFactory]]
+* of this TableEnvironment. */
+  private[flink] def getTypeFactory: RelDataTypeFactory = {
--- End diff --

`getTypeFactory()` and `getTable()` are only called to create a 
`CatalogNode` which internally obtains the `rowType: RelDataType` of the table. 
I think we can simply return the `rowType` for a given table name and remove 
`getTypeFactory()` and `getTable()` from this class. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-218182019
  
Hi @yjshen, sorry for the time it took to review the PR. I like the 
approach and the PR looks good, IMO. I had a few minor comments and questions. 
Let me know what you think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62683790
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
+val newProjectList =
+  afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+e match {
+  case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
+case ne: NamedExpression => ne
+case e if !e.valid => u
+case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
+case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
+  }
+  case _ => throw new IllegalArgumentException
+}
+}
+Project(newProjectList, child)
+  }
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+val allAlias = projectList.forall(_.isInstanceOf[Alias])
+child.toRelNode(relBuilder)
+if (allAlias) {
+  // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
+  //   Add a projection ourselves (will be automatically removed by 
translation rules).
+  relBuilder.push(
+LogicalProject.create(relBuilder.peek(),
+  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map(_.name).asJava))
+} else {
+  relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+}
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] =
+throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+if (aliasList.length > child.output.length) {
+  failValidation("Aliasing more fields than we actually have")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("`as` only allow string arguments")
--- End diff --

I think this error message might be confusing because the Scala API does 
not use String arguments. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62683921
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
+val newProjectList =
+  afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+e match {
+  case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
+case ne: NamedExpression => ne
+case e if !e.valid => u
+case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
+case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
+  }
+  case _ => throw new IllegalArgumentException
+}
+}
+Project(newProjectList, child)
+  }
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+val allAlias = projectList.forall(_.isInstanceOf[Alias])
+child.toRelNode(relBuilder)
+if (allAlias) {
+  // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
+  //   Add a projection ourselves (will be automatically removed by 
translation rules).
+  relBuilder.push(
+LogicalProject.create(relBuilder.peek(),
+  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map(_.name).asJava))
+} else {
+  relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+}
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] =
+throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+if (aliasList.length > child.output.length) {
+  failValidation("Aliasing more fields than we actually have")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("`as` only allow string arguments")
+} else {
+  val names = 
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name)
+  val input = child.output
+  Project(
+names.zip(input).map { case (name, attr) =>
+  Alias(attr, name)} ++ input.drop(names.length), child)
+}
+  }
+}
+
+case class Distinct(child: LogicalNode) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+child.toRelNode(relBuilder)
+

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62685356
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.validate
+
+import scala.reflect.ClassTag
+import scala.util.{Failure, Success, Try}
+
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.validate.FunctionCatalog.FunctionBuilder
+
+/**
+  * A catalog for looking up user defined functions, used by an Analyzer.
+  *
+  * Note: this is adapted from Apache Spark's FunctionRegistry.
+  */
+trait FunctionCatalog {
--- End diff --

Do we need this class except for some of the tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62685176
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -472,61 +339,39 @@ class Table(
 tableEnv.emitToSink(this, configuredSink)
   }
 
-  private def createRenamingProject(exprs: Seq[RexNode]): LogicalProject = 
{
-
-val names = exprs.map{ e =>
-  e.getKind match {
-case SqlKind.AS =>
-  e.asInstanceOf[RexCall].getOperands.get(1)
-.asInstanceOf[RexLiteral].getValue
-.asInstanceOf[NlsString].getValue
-case SqlKind.INPUT_REF =>
-  
relNode.getRowType.getFieldNames.get(e.asInstanceOf[RexInputRef].getIndex)
-case _ =>
-  throw new PlanGenException("Unexpected expression type 
encountered.")
-  }
-
-}
-LogicalProject.create(relNode, exprs.toList.asJava, 
names.toList.asJava)
-  }
-
   private def checkUniqueNames(exprs: Seq[Expression]): Unit = {
 val names: mutable.Set[String] = mutable.Set()
 
 exprs.foreach {
-  case n: Naming =>
+  case n: Alias =>
 // explicit name
 if (names.contains(n.name)) {
-  throw new IllegalArgumentException(s"Duplicate field name 
$n.name.")
+  throw new ValidationException(s"Duplicate field name $n.name.")
 } else {
   names.add(n.name)
 }
   case u: UnresolvedFieldReference =>
 // simple field forwarding
 if (names.contains(u.name)) {
-  throw new IllegalArgumentException(s"Duplicate field name 
$u.name.")
+  throw new ValidationException(s"Duplicate field name $u.name.")
 } else {
   names.add(u.name)
 }
   case _ => // Do nothing
 }
   }
 
+  @inline protected def validate(logicalNode: => LogicalNode): Table = {
+new Table(tableEnv, logicalNode.validate(tableEnv))
--- End diff --

Would it be OK to remove this method and move the `new Table` call to each 
API method? 
I think this would make the code easier to read.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62684793
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -472,61 +339,39 @@ class Table(
 tableEnv.emitToSink(this, configuredSink)
   }
 
-  private def createRenamingProject(exprs: Seq[RexNode]): LogicalProject = 
{
-
-val names = exprs.map{ e =>
-  e.getKind match {
-case SqlKind.AS =>
-  e.asInstanceOf[RexCall].getOperands.get(1)
-.asInstanceOf[RexLiteral].getValue
-.asInstanceOf[NlsString].getValue
-case SqlKind.INPUT_REF =>
-  
relNode.getRowType.getFieldNames.get(e.asInstanceOf[RexInputRef].getIndex)
-case _ =>
-  throw new PlanGenException("Unexpected expression type 
encountered.")
-  }
-
-}
-LogicalProject.create(relNode, exprs.toList.asJava, 
names.toList.asJava)
-  }
-
   private def checkUniqueNames(exprs: Seq[Expression]): Unit = {
--- End diff --

Can we move this check into `Project`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62683416
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate._
+
+/**
+  * LogicalNode is created and validated as we construct query plan using 
Table API.
+  *
+  * The main validation procedure is separated into two phases:
+  * Expressions' resolution and transformation 
(#resolveExpressions(TableEnvironment)):
+  * 
--- End diff --

We use wiki-style syntax instead of HTML in the ScalaDocs of `flink-table`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62684644
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
+val newProjectList =
+  afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+e match {
+  case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
+case ne: NamedExpression => ne
+case e if !e.valid => u
+case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
+case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
+  }
+  case _ => throw new IllegalArgumentException
+}
+}
+Project(newProjectList, child)
+  }
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+val allAlias = projectList.forall(_.isInstanceOf[Alias])
+child.toRelNode(relBuilder)
+if (allAlias) {
+  // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
+  //   Add a projection ourselves (will be automatically removed by 
translation rules).
+  relBuilder.push(
+LogicalProject.create(relBuilder.peek(),
+  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map(_.name).asJava))
+} else {
+  relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+}
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] =
+throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+if (aliasList.length > child.output.length) {
+  failValidation("Aliasing more fields than we actually have")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("`as` only allow string arguments")
+} else {
+  val names = 
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name)
+  val input = child.output
+  Project(
+names.zip(input).map { case (name, attr) =>
+  Alias(attr, name)} ++ input.drop(names.length), child)
+}
+  }
+}
+
+case class Distinct(child: LogicalNode) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+child.toRelNode(relBuilder)
+

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62684422
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
+val newProjectList =
+  afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+e match {
+  case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
+case ne: NamedExpression => ne
+case e if !e.valid => u
+case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
+case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
+  }
+  case _ => throw new IllegalArgumentException
+}
+}
+Project(newProjectList, child)
+  }
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+val allAlias = projectList.forall(_.isInstanceOf[Alias])
+child.toRelNode(relBuilder)
+if (allAlias) {
+  // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
+  //   Add a projection ourselves (will be automatically removed by 
translation rules).
+  relBuilder.push(
+LogicalProject.create(relBuilder.peek(),
+  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map(_.name).asJava))
+} else {
+  relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+}
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] =
+throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+if (aliasList.length > child.output.length) {
+  failValidation("Aliasing more fields than we actually have")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("`as` only allow string arguments")
+} else {
+  val names = 
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name)
+  val input = child.output
+  Project(
+names.zip(input).map { case (name, attr) =>
+  Alias(attr, name)} ++ input.drop(names.length), child)
+}
+  }
+}
+
+case class Distinct(child: LogicalNode) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+child.toRelNode(relBuilder)
+

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62684054
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+val afterResolve = 
super.resolveExpressions(tableEnv).asInstanceOf[Project]
+val newProjectList =
+  afterResolve.projectList.zipWithIndex.map { case (e, i) =>
+e match {
+  case u @ UnresolvedAlias(child, optionalAliasName) => child 
match {
+case ne: NamedExpression => ne
+case e if !e.valid => u
+case c @ Cast(ne: NamedExpression, _) => Alias(c, ne.name)
+case other => Alias(other, 
optionalAliasName.getOrElse(s"_c$i"))
+  }
+  case _ => throw new IllegalArgumentException
+}
+}
+Project(newProjectList, child)
+  }
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+val allAlias = projectList.forall(_.isInstanceOf[Alias])
+child.toRelNode(relBuilder)
+if (allAlias) {
+  // Calcite's RelBuilder does not translate identity projects even if 
they rename fields.
+  //   Add a projection ourselves (will be automatically removed by 
translation rules).
+  relBuilder.push(
+LogicalProject.create(relBuilder.peek(),
+  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map(_.name).asJava))
+} else {
+  relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+}
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] =
+throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
+if (aliasList.length > child.output.length) {
+  failValidation("Aliasing more fields than we actually have")
+} else if 
(!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+  failValidation("`as` only allow string arguments")
+} else {
+  val names = 
aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name)
+  val input = child.output
+  Project(
+names.zip(input).map { case (name, attr) =>
+  Alias(attr, name)} ++ input.drop(names.length), child)
+}
+  }
+}
+
+case class Distinct(child: LogicalNode) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+child.toRelNode(relBuilder)
+

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62683621
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode 
= {
--- End diff --

Can we move the check of `Table.checkUniqueNames` to `Project`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62681558
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
 ---
@@ -25,15 +25,29 @@ import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
NumericTypeInfo, TypeInformation}
+import org.apache.flink.api.table.typeutils.{TypeCheckUtils, TypeConverter}
+import org.apache.flink.api.table.validate.ExprValidationResult
 
-abstract class BinaryArithmetic extends BinaryExpression { self: Product =>
+abstract class BinaryArithmetic extends BinaryExpression {
   def sqlOperator: SqlOperator
 
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
 relBuilder.call(sqlOperator, children.map(_.toRexNode))
   }
+
+  override def dataType = left.dataType
+
+  // TODO: tighten this rule once we implemented type coercion rules 
during validation
+  override def validateInput(): ExprValidationResult = {
+if (!left.dataType.isInstanceOf[NumericTypeInfo[_]] ||
+  !right.dataType.isInstanceOf[NumericTypeInfo[_]]) {
+  ExprValidationResult.ValidationFailure(s"$this require both operand 
Numeric, get" +
--- End diff --

Please change to `"$this requireS both operandS to be Numeric"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62683029
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate._
+
+/**
+  * LogicalNode is created and validated as we construct query plan using 
Table API.
+  *
+  * The main validation procedure is separated into two phases:
+  * Expressions' resolution and transformation 
(#resolveExpressions(TableEnvironment)):
+  * 
+  *   translate UnresolvedFieldReference into ResolvedFieldReference
+  * using child operator's output
+  *   translate Call(UnresolvedFunction) into solid Expression
+  *   generate alias names for query output
+  *   
+  * 
+  *
+  * LogicalNode validation (#validate(TableEnvironment)):
+  * 
+  *   check no UnresolvedFieldReference exists any more
+  *   check if all expressions have children of needed type
+  *   check each logical operator have desired input
+  * 
+  * Once we pass the validation phase, we can safely convert LogicalNode 
into Calcite's RelNode.
+  *
+  * Note: this is adapted from Apache Spark's LogicalPlan.
+  */
+abstract class LogicalNode extends TreeNode[LogicalNode] {
+  def output: Seq[Attribute]
+
+  def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
+// resolve references and function calls
+transformExpressionsUp {
+  case u @ UnresolvedFieldReference(name) =>
+resolveChildren(name).getOrElse(u)
+  case c @ Call(name, children) if c.childrenValid =>
+tableEnv.getFunctionCatalog.lookupFunction(name, children)
+}
+  }
+
+  def toRelNode(relBuilder: RelBuilder): RelBuilder
+
+  def validate(tableEnv: TableEnvironment): LogicalNode = {
+val resolvedNode = resolveExpressions(tableEnv)
+resolvedNode.transformExpressionsUp {
+  case a: Attribute if !a.valid =>
+val from = children.flatMap(_.output).map(_.name).mkString(", ")
+failValidation(s"cannot resolve [${a.name}] given input [$from]")
+
+  case e: Expression if e.validateInput().isFailure =>
+e.validateInput() match {
+  case ExprValidationResult.ValidationFailure(message) =>
+failValidation(s"Expression $e failed on input check: 
$message")
+}
+}
+  }
+
+  /**
+* Resolves the given strings to a [[NamedExpression]] using the input 
from all child
+* nodes of this LogicalPlan.
+*/
+  def resolveChildren(name: String): Option[NamedExpression] =
+resolve(name, children.flatMap(_.output))
+
+  /**
+*  Performs attribute resolution given a name and a sequence of 
possible attributes.
+*/
+  def resolve(name: String, input: Seq[Attribute]): 
Option[NamedExpression] = {
+// find all matches in input
+val candidates = input.filter(_.name.equalsIgnoreCase(name))
+if (candidates.length > 1) {
+  failValidation(s"Reference $name is ambiguous")
+} else if (candidates.length == 0) {
+  None
+} else {
+  Some(candidates.head.withName(name))
+}
+  }
+
+  def expressions: Seq[Expression] = {
--- End diff --

I think this code is not used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org 

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62682986
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate._
+
+/**
+  * LogicalNode is created and validated as we construct query plan using 
Table API.
+  *
+  * The main validation procedure is separated into two phases:
+  * Expressions' resolution and transformation 
(#resolveExpressions(TableEnvironment)):
+  * 
+  *   translate UnresolvedFieldReference into ResolvedFieldReference
+  * using child operator's output
+  *   translate Call(UnresolvedFunction) into solid Expression
+  *   generate alias names for query output
+  *   
+  * 
+  *
+  * LogicalNode validation (#validate(TableEnvironment)):
+  * 
+  *   check no UnresolvedFieldReference exists any more
+  *   check if all expressions have children of needed type
+  *   check each logical operator have desired input
+  * 
+  * Once we pass the validation phase, we can safely convert LogicalNode 
into Calcite's RelNode.
+  *
+  * Note: this is adapted from Apache Spark's LogicalPlan.
+  */
+abstract class LogicalNode extends TreeNode[LogicalNode] {
+  def output: Seq[Attribute]
+
+  def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
+// resolve references and function calls
+transformExpressionsUp {
+  case u @ UnresolvedFieldReference(name) =>
+resolveChildren(name).getOrElse(u)
+  case c @ Call(name, children) if c.childrenValid =>
+tableEnv.getFunctionCatalog.lookupFunction(name, children)
+}
+  }
+
+  def toRelNode(relBuilder: RelBuilder): RelBuilder
--- End diff --

can we rename this method to `getRelBuilder` and make it protected (only 
accessible for subclasses of `LogicalNode`) and add a method 
`toRelNode(relBuilder: RelBuilder): RelNode` that actually returns a `RelNode`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62682619
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.typeutils.TypeCheckUtils
+import org.apache.flink.api.table.validate.ExprValidationResult
+
+case class Abs(child: Expression) extends UnaryExpression {
+  override def dataType: TypeInformation[_] = child.dataType
+
+  override def validateInput(): ExprValidationResult =
+TypeCheckUtils.assertNumericExpr(child.dataType, "Abs")
+
+  override def toString(): String = s"abs($child)"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+relBuilder.call(SqlStdOperatorTable.ABS, child.toRexNode)
+  }
+}
+
+case class Ceil(child: Expression) extends UnaryExpression {
+  override def dataType: TypeInformation[_] = LONG_TYPE_INFO
+
+  override def validateInput(): ExprValidationResult =
+TypeCheckUtils.assertNumericExpr(child.dataType, "Ceil")
+
+  override def toString(): String = s"ceil($child)"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+relBuilder.call(SqlStdOperatorTable.CEIL, child.toRexNode)
+  }
+}
+
+case class Exp(child: Expression) extends UnaryExpression {
+  override def dataType: TypeInformation[_] = DOUBLE_TYPE_INFO
+
+  // TODO: this could be loosened by enabling implicit cast
+  override def validateInput(): ExprValidationResult = {
+if (child.dataType == DOUBLE_TYPE_INFO) {
--- End diff --

Why do `Exp`, `Log10`, `Ln`, and `Power` require `Double` types? Shouldn't 
all numeric types be supported for these functions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62682327
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
 ---
@@ -21,25 +21,47 @@ import org.apache.calcite.rex.RexNode
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
 
-abstract class BinaryPredicate extends BinaryExpression { self: Product => 
}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.table.validate.ExprValidationResult
+
+abstract class BinaryPredicate extends BinaryExpression {
+  override def dataType = BasicTypeInfo.BOOLEAN_TYPE_INFO
+
+  override def validateInput(): ExprValidationResult = {
+if (left.dataType == BasicTypeInfo.BOOLEAN_TYPE_INFO &&
+right.dataType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+  ExprValidationResult.ValidationSuccess
+} else {
+  ExprValidationResult.ValidationFailure(s"$this only accept child of 
Boolean Type, " +
--- End diff --

Please change to `"$this only accepts children of Boolean type"`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62682155
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
 ---
@@ -20,27 +20,98 @@ package org.apache.flink.api.table.expressions
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 
-case class UnresolvedFieldReference(override val name: String) extends 
LeafExpression {
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.validate.ExprValidationResult
+
+object NamedExpression {
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  def newExprId: Int = curId.getAndIncrement()
+}
+
+trait NamedExpression extends Expression {
+  def name: String
+  def exprId: Int
+  def toAttribute: Attribute
+}
+
+abstract class Attribute extends LeafExpression with NamedExpression {
+  override def toAttribute: Attribute = this
+
+  def withName(newName: String): Attribute
+}
+
+case class UnresolvedFieldReference(name: String) extends Attribute {
+  override def exprId: Int = throw new UnresolvedException(s"calling 
exprId on ${this.getClass}")
+
   override def toString = "\"" + name
 
+  override def withName(newName: String): Attribute = 
UnresolvedFieldReference(newName)
+
+  override def dataType: TypeInformation[_] =
+throw new UnresolvedException(s"calling dataType on ${this.getClass}")
+
+  override def validateInput(): ExprValidationResult =
+ExprValidationResult.ValidationFailure(s"Unresolved reference $name")
+}
+
+case class ResolvedFieldReference(
+name: String,
+dataType: TypeInformation[_])(
+val exprId: Int = NamedExpression.newExprId) extends Attribute {
+
+  override def toString = s"'$name"
+
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
 relBuilder.field(name)
   }
-}
 
-case class ResolvedFieldReference(override val name: String) extends 
LeafExpression {
-  override def toString = s"'$name"
+  override def withName(newName: String): Attribute = {
+if (newName == name) {
+  this
+} else {
+  ResolvedFieldReference(newName, dataType)(exprId)
+}
+  }
 }
 
-case class Naming(child: Expression, override val name: String) extends 
UnaryExpression {
+case class Alias(child: Expression, name: String)
+extends UnaryExpression with NamedExpression {
+  val exprId: Int = NamedExpression.newExprId
+
   override def toString = s"$child as '$name"
 
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
 relBuilder.alias(child.toRexNode, name)
   }
 
-  override def makeCopy(anyRefs: Seq[AnyRef]): this.type = {
+  override def dataType: TypeInformation[_] = child.dataType
+
+  override def makeCopy(anyRefs: Array[AnyRef]): this.type = {
 val child: Expression = anyRefs.head.asInstanceOf[Expression]
 copy(child, name).asInstanceOf[this.type]
   }
+
+  override def toAttribute: Attribute = {
+if (valid) {
+  ResolvedFieldReference(name, child.dataType)(exprId)
+} else {
+  UnresolvedFieldReference(name)
+}
+  }
+}
+
+case class UnresolvedAlias(
+child: Expression,
+aliasName: Option[String] = None) extends UnaryExpression with 
NamedExpression {
+
+  override def name: String =
+throw new UnresolvedException("Invalid call to name on 
UnresolvedAlias")
+  override def toAttribute: Attribute =
--- End diff --

can you add a new lines here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62681920
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
 ---
@@ -25,15 +25,29 @@ import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
NumericTypeInfo, TypeInformation}
+import org.apache.flink.api.table.typeutils.{TypeCheckUtils, TypeConverter}
+import org.apache.flink.api.table.validate.ExprValidationResult
 
-abstract class BinaryArithmetic extends BinaryExpression { self: Product =>
+abstract class BinaryArithmetic extends BinaryExpression {
   def sqlOperator: SqlOperator
 
   override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
 relBuilder.call(sqlOperator, children.map(_.toRexNode))
   }
+
+  override def dataType = left.dataType
--- End diff --

Is this a sensible default? Shouldn't it be the data type with the higher 
precision / larger value range of both inputs, e.g., `Double` in case of 
`Double` and `Int`, etc?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62681359
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
 ---
@@ -36,41 +39,59 @@ abstract sealed class Aggregation extends 
UnaryExpression { self: Product =>
 }
 
 case class Sum(child: Expression) extends Aggregation {
-  override def toString = s"($child).sum"
+  override def toString = s"sum($child)"
 
   override def toAggCall(name: String)(implicit relBuilder: RelBuilder): 
AggCall = {
 relBuilder.aggregateCall(SqlStdOperatorTable.SUM, false, null, name, 
child.toRexNode)
   }
+
+  override def dataType = child.dataType
+
+  override def validateInput = 
TypeCheckUtils.assertNumericExpr(child.dataType, "sum")
 }
 
 case class Min(child: Expression) extends Aggregation {
-  override def toString = s"($child).min"
+  override def toString = s"min($child)"
 
   override def toAggCall(name: String)(implicit relBuilder: RelBuilder): 
AggCall = {
 relBuilder.aggregateCall(SqlStdOperatorTable.MIN, false, null, name, 
child.toRexNode)
   }
+
+  override def dataType = child.dataType
+
+  override def validateInput = 
TypeCheckUtils.assertOrderableExpr(child.dataType, "min")
 }
 
 case class Max(child: Expression) extends Aggregation {
-  override def toString = s"($child).max"
+  override def toString = s"max($child)"
 
   override def toAggCall(name: String)(implicit relBuilder: RelBuilder): 
AggCall = {
 relBuilder.aggregateCall(SqlStdOperatorTable.MAX, false, null, name, 
child.toRexNode)
   }
+
+  override def dataType = child.dataType
+
+  override def validateInput = 
TypeCheckUtils.assertOrderableExpr(child.dataType, "max")
 }
 
 case class Count(child: Expression) extends Aggregation {
-  override def toString = s"($child).count"
+  override def toString = s"count($child)"
 
   override def toAggCall(name: String)(implicit relBuilder: RelBuilder): 
AggCall = {
 relBuilder.aggregateCall(SqlStdOperatorTable.COUNT, false, null, name, 
child.toRexNode)
   }
+
+  override def dataType = BasicTypeInfo.LONG_TYPE_INFO
 }
 
 case class Avg(child: Expression) extends Aggregation {
-  override def toString = s"($child).avg"
+  override def toString = s"avg($child)"
 
   override def toAggCall(name: String)(implicit relBuilder: RelBuilder): 
AggCall = {
 relBuilder.aggregateCall(SqlStdOperatorTable.AVG, false, null, name, 
child.toRexNode)
   }
+
+  override def dataType = BasicTypeInfo.DOUBLE_TYPE_INFO
--- End diff --

Our current implementation returns avg aggregates as the input data type. I 
think this should be changed to `child.dataType`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62681024
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
 ---
@@ -17,13 +17,34 @@
  */
 package org.apache.flink.api.table.expressions
 
-import java.util.concurrent.atomic.AtomicInteger
-
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 
-abstract class Expression extends TreeNode[Expression] { self: Product =>
-  def name: String = Expression.freshName("expression")
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate.ExprValidationResult
+
+abstract class Expression extends TreeNode[Expression] {
+  /**
+* Returns the [[TypeInformation]] for evaluating this expression.
+* It is sometimes not available until the expression is valid.
+*/
+  def dataType: TypeInformation[_]
--- End diff --

can we rename this field to `resultType`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62680699
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -152,13 +160,13 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *
 * @param name The name under which the table is registered.
 * @param table The table to register in the catalog
-* @throws TableException if another table is registered under the 
provided name.
+* @throws ValidationException if another table is registered under the 
provided name.
 */
-  @throws[TableException]
+  @throws[ValidationException]
   protected def registerTableInternal(name: String, table: AbstractTable): 
Unit = {
 
 if (isRegistered(name)) {
-  throw new TableException(s"Table \'$name\' already exists. " +
+  throw new ValidationException(s"Table \'$name\' already exists. " +
--- End diff --

I think we can keep `TableException` here. I would use 
`ValidationException` for all invalid expressions within Table API queries. 
`TableException` can be used for invalid use of the Table API in general 
(registering tables under existing names, ...) + current limitations 
(DataTypes, non-equality joins, etc.). Do you think such a separation would 
make sense?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-10 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1958#discussion_r62679391
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
 ---
@@ -92,8 +93,7 @@ abstract class StreamTableEnvironment(
   def ingest(tableName: String): Table = {
 
 if (isRegistered(tableName)) {
-  relBuilder.scan(tableName)
-  new Table(relBuilder.build(), this)
+  new Table(this, CatalogNode(tableName, getTable(tableName), 
getTypeFactory))
 }
 else {
   throw new TableException(s"Table \'$tableName\' was not found in the 
registry.")
--- End diff --

Throw a `ValidationException`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-09 Thread twalthr
Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-217862197
  
Yes, I will also take a look at it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-06 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-217430461
  
I took a brief look at the PR. The overall structure of the validation 
looks good to me. Will do a more in-depth review in the next days. @twalthr, 
can you have another look as well?
Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-03 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-216506763
  
That's a known issue, see FLINK-3860. No need to worry about this PR. 
I'll have a look soon, thanks for the update!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-03 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-216505284
  
Test failure due to irrelevant test:
```
[INFO] flink-table  SUCCESS [08:03 
min]

[INFO] flink-connector-wikiedits .. FAILURE [02:02 
min]
```
```
Running 
org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSourceTest
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 120.051 sec 
<<< FAILURE! - in 
org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSourceTest

testWikipediaEditsSource(org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSourceTest)
  Time elapsed: 120.022 sec  <<< ERROR!
java.lang.Exception: test timed out after 12 milliseconds
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-03 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-216502690
  
@zentol OK, I've closed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-03 Thread yjshen
Github user yjshen closed the pull request at:

https://github.com/apache/flink/pull/1916


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-03 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-216501884
  
Since this PR is a substitute, could you close the old one? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-03 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1958#issuecomment-216487312
  
This PR substitute #1916 by squashing several previous commits into single 
one for easier rebase.

@fhueske I've implemented eager validation here, can you take a look at 
this one?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-03 Thread yjshen
GitHub user yjshen opened a pull request:

https://github.com/apache/flink/pull/1958

[FLINK-3754][Table]Add a validation phase before construct RelNode using 
TableAPI

This PR aims at adding an extra phase of **validation** for plans generated 
from Table API, matches the functionality of Calcite's Validator that are 
called during we execute an query expressed in SQL String.

In order to do this, I inserted a new layer between TableAPI and `RelNode` 
construction: The `Logical Plan`.

And the main procedure of validation work as follows:

1. Constructing a logical plan node
2. Do resolution using schema and `FunctionCatalog`
3. Do validation on the type annotated logical plan node

After we finish the validation successfully, it's safe to construct 
`RelNode`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yjshen/flink eager_validation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1958.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1958


commit ec6bf418e065d836f5399275d4ab24b9c29ab0fe
Author: Yijie Shen 
Date:   2016-04-13T08:46:58Z

Add an extra validation phase before construct RelNode.

Squash previous commits into single one for easier rebase.
The eight previous commits are:

  make TreeNode extends Product
  wip expressions validation, should create expressions for functions next
  add functions for math and string
  wip move table api on logicalNode
  resolve and validate next
  wip
  fix bug in validator, merge eval, add doc
  resolve comments

commit d31a782475903b16f57fee819fc1ddb830aaa597
Author: Yijie Shen 
Date:   2016-05-03T08:57:02Z

do eager validation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-02 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-216270686
  
Great, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-05-02 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-216269369
  
Will start to work on eager validation now. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-28 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-215422057
  
@fhueske If that's the preferred way, I will try to do some simplifications 
and see.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-28 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-215414605
  
If we decide for eager validation, we can also simplify the code a bit, 
right? 

I might be wrong but we could remove `PlanPreparation` and the recursive 
validation of `LogicalNode` since the inputs of an operator would have been 
already validated. The checks of `Validator.validate()` could be moved to the 
respective `LogicalNode`. Not sure if we would still need the `RuleExecutor`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-28 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-215336755
  
Hi @fhueske @twalthr, thanks for the detailed review work! 👍 
I've left some comment above to express my opinion, looking forward to more 
discussions on this :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-28 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-215334972
  
@twalthr, translate table API call to `SqlNode` and use `validator` in 
Calcite seems another reasonable solution, maybe it's hard to do `eager 
validation` as @fhueske proposed? 
Yes, I agree we should discuss which way is preferred solution :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-28 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1916#discussion_r61382801
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -355,4 +380,26 @@ object TableEnvironment {
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
 
+  /**
+* The primary workflow for executing plan validation for that 
generated from Table API.
+* The validation is intentionally designed as a lazy procedure and 
triggered when we
+* are going to run on Flink core.
+*/
+  class PlanPreparation(val env: TableEnvironment, val logical: 
LogicalNode) {
+
+lazy val resolvedPlan: LogicalNode = env.getValidator.resolve(logical)
+
+def validate(): Unit = env.getValidator.validate(resolvedPlan)
+
+lazy val relNode: RelNode = {
+  env match {
+case _: BatchTableEnvironment =>
--- End diff --

Yes, it shouldn't be distinguished. The difference should exist inside 
validator's rule set.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-28 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-215332571
  
> I think it would be good to eagerly check each method call of the Table 
API. This would make debugging easier, because exceptions would be thrown where 
the error is caused. Please correct me if I am wrong, but I think we would not 
lose validation coverage compared to the coverage this PR if we do eager 
validation? It might also be easier, because we do not need the recursive 
operator traversal (still the expression traversal though). Maybe we can even 
directly translate to RelNodes after validation, just like we do right now. I 
think a lot of this PR could be used for eager validation, not sure if it would 
be easily possible with the SqlNode validation approach. 

Regarding the eager validation you mentioned, I think that could be 
accomplished by calling `validate()` each time I am constructing another 
`Table`, in other words, each time I am calling a `Table api`, changing the 
current code from:
``` scala
class Table(
private[flink] val tableEnv: TableEnvironment,
private[flink] val planPreparation: PlanPreparation) {

  def this(tableEnv: TableEnvironment, logicalPlan: LogicalNode) = {
this(tableEnv, new PlanPreparation(tableEnv, logicalPlan))
  }
```
to 
``` scala
def this(tableEnv: TableEnvironment, logicalPlan: LogicalNode) = {
this(tableEnv, {
  val pp = new PlanPreparation(tableEnv, logicalPlan)
  pp.validate()
  pp
})
```
and also add an additional flag annotate the logical node as `validate` and 
therefore avoid the `recursive logical plan traversal` would be enough.  Do I 
understand your idea correctly?
On the other hand, I prefer to postponed `RelNode` construction until we 
are going to run the query on Flink, since we only need `RelNode` at that time.

> While reviewing the PR, I noticed that some classes seem to be partially 
derived from Spark's code base (e.g., TreeNode and RuleExecutor). I know there 
are some common patterns that apply in optimizers, but it is good style to give 
credit to the original source code. 
Can you list which classes are derived from Spark code and add a comment to 
them pointing to the source of the code?

OK, I will do a more detailed scan and give credit to all the original 
source code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-28 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1916#discussion_r61378731
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
 ---
@@ -215,7 +215,7 @@ class ScalarFunctionsTest {
 
   }
 
-  @Test
+  @Ignore
--- End diff --

`exp` `log` `pow` and `ln` should have `Double` as input. What I was 
thinking is, we should add some extra `type coercion` rules and add an `cast` 
when we can do it safely(when an expression is asking a `Double` but we provide 
a `Int`), for example, `Byte` to `Double`, `Long` to `Double` and so on, and 
rewrite the expression as `pow(cast(x, Double))`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-27 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-215142967
  
Hi @yjshen, thanks for your patience. I also finished a first pass over the 
PR. 

I'd like to propose a third alternative, in addition to the custom 
validation phase (this PR) and generating `SqlNode`s and using Calcite's 
validator. Both approaches would mean that the validation happens before the 
logical plan is translated into a `RelNode`. I think it would be good to 
eagerly check each method call of the Table API. This would make debugging 
easier, because exceptions would be thrown where the error is caused. Please 
correct me if I am wrong, but I think we would not lose validation coverage 
compared to the coverage this PR if we do eager validation? It might also be 
easier, because we do not need the recursive operator traversal (still the 
expression traversal though). Maybe we can even directly translate to 
`RelNode`s after validation, just like we do right now. I think a lot of this 
PR could be used for eager validation, not sure if it would be easily possible 
with the `SqlNode` validation approach. 
What do you think about eagerly validation, @yjshen and @twalthr?

While reviewing the PR, I noticed that some classes seem to be partially 
derived from Spark's code base (e.g., `TreeNode` and `RuleExecutor`). I know 
there are some common patterns that apply in optimizers, but it is good style 
to give credit to the original source code. 
Can you list which classes are derived from Spark code and add a comment to 
them pointing to the source of the code?

Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1916#discussion_r61291959
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
 ---
@@ -215,7 +215,7 @@ class ScalarFunctionsTest {
 
   }
 
-  @Test
+  @Ignore
--- End diff --

Why did you exclude these tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1916#discussion_r61291865
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
 ---
@@ -17,13 +17,34 @@
  */
 package org.apache.flink.api.table.expressions
 
-import java.util.concurrent.atomic.AtomicInteger
-
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 
-abstract class Expression extends TreeNode[Expression] { self: Product =>
-  def name: String = Expression.freshName("expression")
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate.ExprValidationResult
+
+abstract class Expression extends TreeNode[Expression] {
+  /**
+* Returns the [[TypeInformation]] for evaluating this expression.
+* It is sometimes available until the expression is valid.
+*/
+  def dataType: TypeInformation[_]
+
+  /**
+* One pass validation of the expression tree in post order.
+*/
+  lazy val valid: Boolean = childrenValid && validateInput().isSuccess
+
+  def childrenValid: Boolean = children.forall(_.valid)
+
+  /**
+* Check input data types, inputs number or other properties specified 
by this expression.
+* Return `ValidationSuccess` if it pass the check,
+* or `ValidationFailure` with supplement message explaining the error.
+* Note: we should only call this method until `childrenValidated == 
true`
--- End diff --

`childrenValidated` -> `childrenValid`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1916#discussion_r61291698
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
 ---
@@ -17,13 +17,34 @@
  */
 package org.apache.flink.api.table.expressions
 
-import java.util.concurrent.atomic.AtomicInteger
-
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
 
-abstract class Expression extends TreeNode[Expression] { self: Product =>
-  def name: String = Expression.freshName("expression")
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.trees.TreeNode
+import org.apache.flink.api.table.validate.ExprValidationResult
+
+abstract class Expression extends TreeNode[Expression] {
+  /**
+* Returns the [[TypeInformation]] for evaluating this expression.
+* It is sometimes available until the expression is valid.
--- End diff --

+not ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-27 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1916#discussion_r61291621
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
 ---
@@ -355,4 +380,26 @@ object TableEnvironment {
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
 
+  /**
+* The primary workflow for executing plan validation for that 
generated from Table API.
+* The validation is intentionally designed as a lazy procedure and 
triggered when we
+* are going to run on Flink core.
+*/
+  class PlanPreparation(val env: TableEnvironment, val logical: 
LogicalNode) {
+
+lazy val resolvedPlan: LogicalNode = env.getValidator.resolve(logical)
+
+def validate(): Unit = env.getValidator.validate(resolvedPlan)
+
+lazy val relNode: RelNode = {
+  env match {
+case _: BatchTableEnvironment =>
--- End diff --

Why do you distinguish here? It's the same code, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-27 Thread twalthr
Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-215061267
  
@yjshen I looked through the code changes. I was quite impressed that your 
PR touches nearly every class of the current API.

Basically your changes seem to work, however, I'm not sure if we want to 
implement the validation phase for every scalar function ourselves. Actually, 
Calcite already comes with type inference, type checking and validation 
capabilities. I don't know if we want to reinvent the wheel at this point. Your 
approach inserts a layer under the Table API for doing the validation. However, 
instead, this layer could also translate the plan into a SQL tree (on top of 
RelNodes). We could then let Calcite do the work of validation.

This could also solve another problem that I faced when working on 
FLINK-3580. If you take a look at `StandardConvertletTable` of Calcite, you see 
that Calcite also does some conversions which we also need to implement 
ourselves if we do not base the Table API on top of SQL.

We need to discuss how we want to proceed. Both solution are not perfect.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-25 Thread twalthr
Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-214332719
  
Thanks for the contribution @yjshen. I will also have a look at it tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-24 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-213900827
  
@fhueske, thanks for your job. Looking forward to more discussions on this 
:)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-22 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-213567820
  
Thanks for the additional information @yjshen! I'm a bit behind with PR 
reviews, but will definitely have a look begin of next week. Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-22 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-213419043
  
The type annotation work is done from bottom to top:

Firstly, we know each schema of the two input, and we know `List[] 
expression` in `Project` are used to manipulate one row of table data as input 
and output one value per expression, therefore, we can infer the the output 
schema of `Project` (in the current impl this was expressed as: `def output: 
Seq[Attribute]`) if we know each expressions `dataType`. 

For example, `Add`'s dataType is same as it's input, `Or`'s dataType is 
always `Boolean`, `pow(a, b)`'s dataType is always `Double`, however, if and 
only if we understand all kinds of expressions, we are able to infer its 
`dataType`. The main problems here is we only have `Call`(Unresolved Function) 
generated during expression construction, therefore, we should resolve them 
first into solid `Expression`s. `FunctionCatalog` is introduced here for a 
mapping from `FunctionName -> Expression`, we can easily finish the translation 
work as we look up `catalog`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-22 Thread yjshen
Github user yjshen commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-213411425
  
For ease of review, I would like to explain this RP with more details.

While table api are called to construct a query, it was first constructed 
as a operator tree, `LogicalNode` is used to express the tree node. Therefore, 
after we finished constructing a query and about to translate it into a 
dataset/datastream program, the constructed logical plan looks like:
```
Aggregate(List of group by, List of aggregate)
+- Filter (condition expression)
+- Union
+- Project (select column expression) 
+- TableScan(DS1) 
+- Project (select column expression)
+- TableScan(DS2)
```
At this time, only the leaf node: TableScan (`CatalogNode` in current 
implementation) is equipped with type information, in order to do plan 
validation, we need to **annotate** the full logical plan with type information 
first and use type information to do **validate** works.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-22 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1916#discussion_r60728611
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.logical
+
+import scala.collection.JavaConverters._
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFactory
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.schema.{Table => CTable}
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.ValidationException
+
+case class Project(projectList: Seq[NamedExpression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+def allAlias: Boolean = {
+  projectList.forall { proj =>
+proj match {
+  case Alias(r: ResolvedFieldReference, name) => true
+  case _ => false
+}
+  }
+}
+child.toRelNode(relBuilder)
+if (allAlias) {
+  relBuilder.push(
+LogicalProject.create(relBuilder.peek(),
+  projectList.map(_.toRexNode(relBuilder)).asJava,
+  projectList.map(_.name).asJava))
+} else {
+  relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
+}
+  }
+}
+
+case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) 
extends UnaryNode {
+  override def output: Seq[Attribute] =
+throw new UnresolvedException("Invalid call to output on AliasNode")
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder =
+throw new UnresolvedException("Invalid call to toRelNode on AliasNode")
+
+  override lazy val resolved: Boolean = false
+}
+
+case class Distinct(child: LogicalNode) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+child.toRelNode(relBuilder)
+relBuilder.distinct()
+  }
+}
+
+case class Filter(condition: Expression, child: LogicalNode) extends 
UnaryNode {
+  override def output: Seq[Attribute] = child.output
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+child.toRelNode(relBuilder)
+relBuilder.filter(condition.toRexNode(relBuilder))
+  }
+}
+
+case class Aggregate(
+groupingExpressions: Seq[Expression],
+aggregateExpressions: Seq[NamedExpression],
+child: LogicalNode) extends UnaryNode {
+
+  override def output: Seq[Attribute] = {
+(groupingExpressions ++ aggregateExpressions) map { agg =>
+  agg match {
+case ne: NamedExpression => ne.toAttribute
+case e => Alias(e, e.toString).toAttribute
+  }
+}
+  }
+
+  override def toRelNode(relBuilder: RelBuilder): RelBuilder = {
+child.toRelNode(relBuilder)
+relBuilder.aggregate(
+  
relBuilder.groupKey(groupingExpressions.map(_.toRexNode(relBuilder)).asJava),
+  aggregateExpressions.filter(_.isInstanceOf[Alias]).map { e =>
+e match {
+  case Alias(agg: Aggregation, name) => 
agg.toAggCall(name)(relBuilder)
+  case _ => null // this should never happen
+}
+  }.asJava)
+  }
+}
+
+case class Union(left: LogicalNode, right: LogicalNode) extends BinaryNode 
{
+  override def output: Seq[Attribute] = left.output

[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-22 Thread yjshen
Github user yjshen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1916#discussion_r60728312
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -199,12 +151,12 @@ class Table(
 *   tab.filter('name === "Fred")
 * }}}
 */
-  def filter(predicate: Expression): Table = {
-
-relBuilder.push(relNode)
-relBuilder.filter(predicate.toRexNode(relBuilder))
-
-new Table(relBuilder.build(), tableEnv)
+  def filter(predicate: Expression): Table = withPlan {
+//logicalPlan match {
+//  case j: Join => j.copy(condition = Some(predicate))
+//  case o => Filter(predicate, logicalPlan)
+//}
--- End diff --

Will remove this the next commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-20 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1916#issuecomment-212485745
  
Thanks @yjshen for working on this issue! Unified validation and exceptions 
are a big improvement, IMO. I'll try to have a look soon.

@twalthr, can you have a look at the changes done on the expressions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3754][Table]Add a validation phase befo...

2016-04-20 Thread yjshen
GitHub user yjshen opened a pull request:

https://github.com/apache/flink/pull/1916

[FLINK-3754][Table]Add a validation phase before construct RelNode using 
TableAPI

This PR aims at adding an extra phase of **validation** for plans generated 
from Table API, matches the functionality of Calcite's Validator that are 
called during we execute an query expressed in SQL String.

In order to do this, I inserted a new layer between TableAPI and `RelNode` 
construction: The `Logical Plan`.

And the main procedure of validation and RelNode construction work as 
follows:

1. Constructing logical plan in flight
2. When we are about to execute the plan:
3. Do resolution using table/dataset/datastream schema
4. Do validation on the type annotated logical plan tree
5. Translate logical plan into `RelNode` once we successfully finished 
validation.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yjshen/flink extra_validation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1916.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1916


commit 7fb102af1fa52dab2f0c80785f75bf7e0d8a7062
Author: Yijie Shen 
Date:   2016-04-13T08:46:58Z

make TreeNode extends Product

commit ab75d4857cb203714ee037aea2e55de776c4dd32
Author: Yijie Shen 
Date:   2016-04-15T14:51:20Z

wip expressions validation, should create expressions for functions next

commit 61e4bb09d754fe0aca7624e3fcd70d364e4154d3
Author: Yijie Shen 
Date:   2016-04-16T07:02:07Z

add functions for math and string

commit 6abbfad0a11a3874150baa4dcabf6caab37cf0be
Author: Yijie Shen 
Date:   2016-04-18T16:49:04Z

wip move table api on logicalNode

commit 64ecdbef273895e4527fb6b5120d92acb0d20542
Author: Yijie Shen 
Date:   2016-04-19T05:20:47Z

resolve and validate next

commit dc42a44cc47cdb619076074eda95b84157554337
Author: Yijie Shen 
Date:   2016-04-19T19:26:58Z

wip

commit c04292abe6484e095171a66c399c16e50e98870a
Author: Yijie Shen 
Date:   2016-04-20T12:26:25Z

fix bug in validator, merge eval, add doc




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---