http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala
new file mode 100644
index 0000000..625fdbf
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.trees.Analyzer
+
+/**
+ * This analyzes selection expressions.
+ */
+class SelectionAnalyzer(inputFields: Seq[(String, TypeInformation[_])])
+  extends Analyzer[Expression] {
+
+  def rules = Seq(
+    new ResolveFieldReferences(inputFields),
+    new VerifyNoNestedAggregates,
+    new InsertAutoCasts,
+    new TypeCheck)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala
new file mode 100644
index 0000000..b724561
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.trees.Rule
+import org.apache.flink.api.table.{_}
+
+import scala.collection.mutable
+
+/**
+ * Rule that makes sure we call [[Expression.typeInfo]] on each [[Expression]] 
at least once.
+ * Expressions are expected to perform type verification in this method.
+ */
+class TypeCheck extends Rule[Expression] {
+
+  def apply(expr: Expression) = {
+    val errors = mutable.MutableList[String]()
+
+    val result = expr.transformPre {
+      case expr: Expression=> {
+        // simply get the typeInfo from the expression. this will perform type 
analysis
+        try {
+          expr.typeInfo
+        } catch {
+          case e: ExpressionException =>
+            errors += e.getMessage
+        }
+        expr
+      }
+    }
+
+    if (errors.length > 0) {
+      throw new ExpressionException(
+        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
+    }
+
+    result
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala
new file mode 100644
index 0000000..e75dd20
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.table.expressions.{NopExpression, Expression}
+import org.apache.flink.api.table.trees.Rule
+import org.apache.flink.api.table.{_}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+import scala.collection.mutable
+
+/**
+ * [[Rule]] that verifies that the result type of an [[Expression]] is 
Boolean. This is required
+ * for filter/join predicates.
+ */
+class VerifyBoolean extends Rule[Expression] {
+
+  def apply(expr: Expression) = {
+    if (!expr.isInstanceOf[NopExpression] && expr.typeInfo != 
BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+      throw new ExpressionException(s"Expression $expr of type 
${expr.typeInfo} is not boolean.")
+    }
+
+    expr
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala
new file mode 100644
index 0000000..09dbf88
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.table.expressions.{Aggregation, Expression}
+
+import scala.collection.mutable
+
+import org.apache.flink.api.table.trees.Rule
+
+/**
+ * Rule that verifies that an expression does not contain aggregate 
operations. Right now, join
+ * predicates and filter predicates cannot contain aggregates.
+ */
+class VerifyNoAggregates extends Rule[Expression] {
+
+  def apply(expr: Expression) = {
+    val errors = mutable.MutableList[String]()
+
+    val result = expr.transformPre {
+      case agg: Aggregation=> {
+        errors +=
+          s"""Aggregations are not allowed in join/filter predicates."""
+        agg
+      }
+    }
+
+    if (errors.length > 0) {
+      throw new ExpressionException(
+        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
+    }
+
+    result
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala
new file mode 100644
index 0000000..07acf1e
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.table.expressions.{Expression, Aggregation}
+
+import scala.collection.mutable
+
+import org.apache.flink.api.table.trees.Rule
+
+/**
+ * Rule that verifies that an expression does not contain aggregate operations
+ * as children of aggregate operations.
+ */
+class VerifyNoNestedAggregates extends Rule[Expression] {
+
+  def apply(expr: Expression) = {
+    val errors = mutable.MutableList[String]()
+
+    val result = expr.transformPre {
+      case agg: Aggregation=> {
+        if (agg.child.exists(_.isInstanceOf[Aggregation])) {
+          errors += s"""Found nested aggregation inside "$agg"."""
+        }
+        agg
+      }
+    }
+
+    if (errors.length > 0) {
+      throw new ExpressionException(
+        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
+    }
+
+    result
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
new file mode 100644
index 0000000..e866ea0
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo, 
NumericTypeInfo, TypeInformation}
+
+abstract class BinaryArithmetic extends BinaryExpression { self: Product =>
+  def typeInfo = {
+    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-numeric operand ${left} of type ${left.typeInfo} in $this""")
+    }
+    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-numeric operand "${right}" of type ${right.typeInfo} in 
$this""")
+    }
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types 
${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    left.typeInfo
+  }
+}
+
+case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def typeInfo = {
+    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
+      !(left.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
+      throw new ExpressionException(s"Non-numeric operand type 
${left.typeInfo} in $this")
+    }
+    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
+      !(right.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
+      throw new ExpressionException(s"Non-numeric operand type 
${right.typeInfo} in $this")
+    }
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types 
${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    left.typeInfo
+  }
+
+  override def toString = s"($left + $right)"
+}
+
+case class UnaryMinus(child: Expression) extends UnaryExpression {
+  def typeInfo = {
+    if (!child.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-numeric operand ${child} of type ${child.typeInfo} in $this""")
+    }
+    child.typeInfo
+  }
+
+  override def toString = s"-($child)"
+}
+
+case class Minus(left: Expression, right: Expression) extends BinaryArithmetic 
{
+  override def toString = s"($left - $right)"
+}
+
+case class Div(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def toString = s"($left / $right)"
+}
+
+case class Mul(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def toString = s"($left * $right)"
+}
+
+case class Mod(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def toString = s"($left * $right)"
+}
+
+case class Abs(child: Expression) extends UnaryExpression {
+  def typeInfo = child.typeInfo
+
+  override def toString = s"abs($child)"
+}
+
+abstract class BitwiseBinaryArithmetic extends BinaryExpression { self: 
Product =>
+  def typeInfo: TypeInformation[_] = {
+    if (!left.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-integer operand ${left} of type ${left.typeInfo} in $this""")
+    }
+    if (!right.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-integer operand "${right}" of type ${right.typeInfo} in 
$this""")
+    }
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types 
${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    if (left.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
+      left.typeInfo
+    } else {
+      BasicTypeInfo.INT_TYPE_INFO
+    }
+  }
+}
+
+case class BitwiseAnd(left: Expression, right: Expression) extends 
BitwiseBinaryArithmetic {
+  override def toString = s"($left & $right)"
+}
+
+case class BitwiseOr(left: Expression, right: Expression) extends 
BitwiseBinaryArithmetic {
+  override def toString = s"($left | $right)"
+}
+
+
+case class BitwiseXor(left: Expression, right: Expression) extends 
BitwiseBinaryArithmetic {
+  override def toString = s"($left ^ $right)"
+}
+
+case class BitwiseNot(child: Expression) extends UnaryExpression {
+  def typeInfo: TypeInformation[_] = {
+    if (!child.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-integer operand ${child} of type ${child.typeInfo} in $this""")
+    }
+    if (child.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
+      child.typeInfo
+    } else {
+      BasicTypeInfo.INT_TYPE_INFO
+    }
+  }
+
+  override def toString = s"~($child)"
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
new file mode 100644
index 0000000..9fae862
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.table.ExpressionException
+
+case class Cast(child: Expression, tpe: TypeInformation[_]) extends 
UnaryExpression {
+  def typeInfo = tpe match {
+    case BasicTypeInfo.STRING_TYPE_INFO => tpe
+
+    case b if b.isBasicType && child.typeInfo.isBasicType => tpe
+
+    case _ => throw new ExpressionException(
+      s"Invalid cast: $this. Casts are only valid betwixt primitive types.")
+  }
+
+  override def toString = s"$child.cast($tpe)"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
new file mode 100644
index 0000000..687ea7a
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo}
+
+abstract class BinaryComparison extends BinaryExpression { self: Product =>
+  def typeInfo = {
+    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+      throw new ExpressionException(s"Non-numeric operand ${left} in $this")
+    }
+    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+      throw new ExpressionException(s"Non-numeric operand ${right} in $this")
+    }
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types 
${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+}
+
+case class EqualTo(left: Expression, right: Expression) extends 
BinaryComparison {
+  override def typeInfo = {
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types 
${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+
+  override def toString = s"$left === $right"
+}
+
+case class NotEqualTo(left: Expression, right: Expression) extends 
BinaryComparison {
+  override def typeInfo = {
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types 
${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+
+  override def toString = s"$left !== $right"
+}
+
+case class GreaterThan(left: Expression, right: Expression) extends 
BinaryComparison {
+  override def toString = s"$left > $right"
+}
+
+case class GreaterThanOrEqual(left: Expression, right: Expression) extends 
BinaryComparison {
+  override def toString = s"$left >= $right"
+}
+
+case class LessThan(left: Expression, right: Expression) extends 
BinaryComparison {
+  override def toString = s"$left < $right"
+}
+
+case class LessThanOrEqual(left: Expression, right: Expression) extends 
BinaryComparison {
+  override def toString = s"$left <= $right"
+}
+
+case class IsNull(child: Expression) extends UnaryExpression {
+  def typeInfo = {
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+
+  override def toString = s"($child).isNull"
+}
+
+case class IsNotNull(child: Expression) extends UnaryExpression {
+  def typeInfo = {
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+
+  override def toString = s"($child).isNotNull"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
new file mode 100644
index 0000000..a649aed
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+case class UnresolvedFieldReference(override val name: String) extends 
LeafExpression {
+  def typeInfo = throw new ExpressionException(s"Unresolved field reference: 
$this")
+
+  override def toString = "\"" + name
+}
+
+case class ResolvedFieldReference(
+    override val name: String,
+    tpe: TypeInformation[_]) extends LeafExpression {
+  def typeInfo = tpe
+
+  override def toString = s"'$name"
+}
+
+case class Naming(child: Expression, override val name: String) extends 
UnaryExpression {
+  def typeInfo = child.typeInfo
+
+  override def toString = s"$child as '$name"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
new file mode 100644
index 0000000..f909cab
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import java.util.Date
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.scala.table.ImplicitExpressionOperations
+
+object Literal {
+  def apply(l: Any): Literal = l match {
+    case i:Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO)
+    case l:Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO)
+    case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO)
+    case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO)
+    case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO)
+    case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO)
+    case date: Date => Literal(date, BasicTypeInfo.DATE_TYPE_INFO)
+  }
+}
+
+case class Literal(value: Any, tpe: TypeInformation[_])
+  extends LeafExpression with ImplicitExpressionOperations {
+  def expr = this
+  def typeInfo = tpe
+
+  override def toString = s"$value"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
new file mode 100644
index 0000000..eaf0463
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+abstract class BinaryPredicate extends BinaryExpression { self: Product =>
+  def typeInfo = {
+    if (left.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO ||
+      right.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+      throw new ExpressionException(s"Non-boolean operand types 
${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+}
+
+case class Not(child: Expression) extends UnaryExpression {
+  def typeInfo = {
+    if (child.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+      throw new ExpressionException(s"Non-boolean operand type 
${child.typeInfo} in $this")
+    }
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+
+  override val name = Expression.freshName("not-" + child.name)
+
+  override def toString = s"!($child)"
+}
+
+case class And(left: Expression, right: Expression) extends BinaryPredicate {
+  override def toString = s"$left && $right"
+
+  override val name = Expression.freshName(left.name + "-and-" + right.name)
+}
+
+case class Or(left: Expression, right: Expression) extends BinaryPredicate {
+  override def toString = s"$left || $right"
+
+  override val name = Expression.freshName(left.name + "-or-" + right.name)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
new file mode 100644
index 0000000..c5c8c94
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+/**
+ * This package contains the base class of AST nodes and all the expression 
language AST classes.
+ * Expression trees should not be manually constructed by users. They are 
implicitly constructed
+ * from the implicit DSL conversions in
+ * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and
+ * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]]. 
For the Java API,
+ * expression trees should be generated from a string parser that parses 
expressions and creates
+ * AST nodes.
+ */
+package object expressions

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
new file mode 100644
index 0000000..a39d601
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo}
+
+case class Substring(
+    str: Expression,
+    beginIndex: Expression,
+    endIndex: Expression) extends Expression {
+  def typeInfo = {
+    if (str.typeInfo != BasicTypeInfo.STRING_TYPE_INFO) {
+      throw new ExpressionException(
+        s"""Operand must be of type String in $this, is ${str.typeInfo}.""")
+    }
+    if (!beginIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Begin index must be an integer type in $this, is 
${beginIndex.typeInfo}.""")
+    }
+    if (!endIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""End index must be an integer type in $this, is 
${endIndex.typeInfo}.""")
+    }
+
+    BasicTypeInfo.STRING_TYPE_INFO
+  }
+
+  override def children: Seq[Expression] = Seq(str, beginIndex, endIndex)
+  override def toString = s"($str).substring($beginIndex, $endIndex)"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
new file mode 100644
index 0000000..bdcb22c
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api
+
+/**
+ * == Table API ==
+ *
+ * This package contains the generic part of the Table API. It can be used 
with Flink Streaming
+ * and Flink Batch. From Scala as well as from Java.
+ *
+ * When using the Table API, as user creates a 
[[org.apache.flink.api.table.Table]] from
+ * a DataSet or DataStream. On this relational operations can be performed. A 
table can also
+ * be converted back to a DataSet or DataStream.
+ *
+ * Packages [[org.apache.flink.api.scala.table]] and 
[[org.apache.flink.api.java.table]] contain
+ * the language specific part of the API. Refer to these packages for 
documentation on how
+ * the Table API can be used in Java and Scala.
+ */
+package object table

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
new file mode 100644
index 0000000..2cbd8fa
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.parser
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.table.plan.As
+import org.apache.flink.api.table.expressions._
+
+import scala.util.parsing.combinator.{PackratParsers, JavaTokenParsers}
+
+/**
+ * Parser for expressions inside a String. This parses exactly the same 
expressions that
+ * would be accepted by the Scala Expression DSL.
+ *
+ * See [[org.apache.flink.api.scala.table.ImplicitExpressionConversions]] and
+ * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]] for the 
constructs
+ * available in the Scala Expression DSL. This parser must be kept in sync 
with the Scala DSL
+ * lazy valined in the above files.
+ */
+object ExpressionParser extends JavaTokenParsers with PackratParsers {
+  case class Keyword(key: String)
+
+  // Convert the keyword into an case insensitive Parser
+  implicit def keyword2Parser(kw: Keyword): Parser[String] = {
+    ("""(?i)\Q""" + kw.key + """\E""").r
+  }
+
+  // KeyWord
+
+  lazy val AS: Keyword = Keyword("as")
+  lazy val COUNT: Keyword = Keyword("count")
+  lazy val AVG: Keyword = Keyword("avg")
+  lazy val MIN: Keyword = Keyword("min")
+  lazy val MAX: Keyword = Keyword("max")
+  lazy val SUM: Keyword = Keyword("sum")
+
+  // Literals
+
+  lazy val numberLiteral: PackratParser[Expression] =
+    ((wholeNumber <~ ("L" | "l")) | floatingPointNumber | decimalNumber | 
wholeNumber) ^^ {
+      str =>
+        if (str.endsWith("L") || str.endsWith("l")) {
+          Literal(str.toLong)
+        } else if (str.matches("""-?\d+""")) {
+          Literal(str.toInt)
+        } else if (str.endsWith("f") | str.endsWith("F")) {
+          Literal(str.toFloat)
+        } else {
+          Literal(str.toDouble)
+        }
+    }
+
+  lazy val singleQuoteStringLiteral: Parser[Expression] =
+    ("'" + """([^'\p{Cntrl}\\]|\\[\\'"bfnrt]|\\u[a-fA-F0-9]{4})*""" + "'").r 
^^ {
+      str => Literal(str.substring(1, str.length - 1))
+    }
+
+  lazy val stringLiteralFlink: PackratParser[Expression] = super.stringLiteral 
^^ {
+    str => Literal(str.substring(1, str.length - 1))
+  }
+
+  lazy val boolLiteral: PackratParser[Expression] = ("true" | "false") ^^ {
+    str => Literal(str.toBoolean)
+  }
+
+  lazy val literalExpr: PackratParser[Expression] =
+    numberLiteral |
+      stringLiteralFlink | singleQuoteStringLiteral |
+      boolLiteral
+
+  lazy val fieldReference: PackratParser[Expression] = ident ^^ {
+    case sym => UnresolvedFieldReference(sym)
+  }
+
+  lazy val atom: PackratParser[Expression] =
+    ( "(" ~> expression <~ ")" ) | literalExpr | fieldReference
+
+  // suffix ops
+  lazy val isNull: PackratParser[Expression] = atom <~ ".isNull" ^^ { e => 
IsNull(e) }
+  lazy val isNotNull: PackratParser[Expression] = atom <~ ".isNotNull" ^^ { e 
=> IsNotNull(e) }
+
+  lazy val abs: PackratParser[Expression] = atom <~ ".abs" ^^ { e => Abs(e) }
+
+  lazy val sum: PackratParser[Expression] =
+    (atom <~ ".sum" ^^ { e => Sum(e) }) | (SUM ~ "(" ~> atom <~ ")" ^^ { e => 
Sum(e) })
+  lazy val min: PackratParser[Expression] =
+    (atom <~ ".min" ^^ { e => Min(e) }) | (MIN ~ "(" ~> atom <~ ")" ^^ { e => 
Min(e) })
+  lazy val max: PackratParser[Expression] =
+    (atom <~ ".max" ^^ { e => Max(e) }) | (MAX ~ "(" ~> atom <~ ")" ^^ { e => 
Max(e) })
+  lazy val count: PackratParser[Expression] =
+    (atom <~ ".count" ^^ { e => Count(e) }) | (COUNT ~ "(" ~> atom <~ ")" ^^ { 
e => Count(e) })
+  lazy val avg: PackratParser[Expression] =
+    (atom <~ ".avg" ^^ { e => Avg(e) }) | (AVG ~ "(" ~> atom <~ ")" ^^ { e => 
Avg(e) })
+
+  lazy val cast: PackratParser[Expression] =
+    atom <~ ".cast(BYTE)" ^^ { e => Cast(e, BasicTypeInfo.BYTE_TYPE_INFO) } |
+    atom <~ ".cast(SHORT)" ^^ { e => Cast(e, BasicTypeInfo.SHORT_TYPE_INFO) } |
+    atom <~ ".cast(INT)" ^^ { e => Cast(e, BasicTypeInfo.INT_TYPE_INFO) } |
+    atom <~ ".cast(LONG)" ^^ { e => Cast(e, BasicTypeInfo.LONG_TYPE_INFO) } |
+    atom <~ ".cast(FLOAT)" ^^ { e => Cast(e, BasicTypeInfo.FLOAT_TYPE_INFO) } |
+    atom <~ ".cast(DOUBLE)" ^^ { e => Cast(e, BasicTypeInfo.DOUBLE_TYPE_INFO) 
} |
+    atom <~ ".cast(BOOL)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } 
|
+    atom <~ ".cast(BOOLEAN)" ^^ { e => Cast(e, 
BasicTypeInfo.BOOLEAN_TYPE_INFO) } |
+    atom <~ ".cast(STRING)" ^^ { e => Cast(e, BasicTypeInfo.STRING_TYPE_INFO) 
} |
+    atom <~ ".cast(DATE)" ^^ { e => Cast(e, BasicTypeInfo.DATE_TYPE_INFO) }
+
+  lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ 
")" ^^ {
+    case e ~ _ ~ as ~ _ => Naming(e, as.name)
+  }
+
+  lazy val substring: PackratParser[Expression] =
+    atom ~ ".substring(" ~ expression ~ "," ~ expression ~ ")" ^^ {
+      case e ~ _ ~ from ~ _ ~ to ~ _ => Substring(e, from, to)
+
+    }
+
+  lazy val substringWithoutEnd: PackratParser[Expression] =
+    atom ~ ".substring(" ~ expression ~ ")" ^^ {
+      case e ~ _ ~ from ~ _ => Substring(e, from, Literal(Integer.MAX_VALUE))
+
+    }
+
+  lazy val suffix =
+    isNull | isNotNull |
+      abs | sum | min | max | count | avg | cast |
+      substring | substringWithoutEnd | atom
+
+
+  // unary ops
+
+  lazy val unaryNot: PackratParser[Expression] = "!" ~> suffix ^^ { e => 
Not(e) }
+
+  lazy val unaryMinus: PackratParser[Expression] = "-" ~> suffix ^^ { e => 
UnaryMinus(e) }
+
+  lazy val unaryBitwiseNot: PackratParser[Expression] = "~" ~> suffix ^^ { e 
=> BitwiseNot(e) }
+
+  lazy val unary = unaryNot | unaryMinus | unaryBitwiseNot | suffix
+
+  // binary bitwise opts
+
+  lazy val binaryBitwise = unary * (
+    "&" ^^^ { (a:Expression, b:Expression) => BitwiseAnd(a,b) } |
+      "|" ^^^ { (a:Expression, b:Expression) => BitwiseOr(a,b) } |
+      "^" ^^^ { (a:Expression, b:Expression) => BitwiseXor(a,b) } )
+
+  // arithmetic
+
+  lazy val product = binaryBitwise * (
+    "*" ^^^ { (a:Expression, b:Expression) => Mul(a,b) } |
+      "/" ^^^ { (a:Expression, b:Expression) => Div(a,b) } |
+      "%" ^^^ { (a:Expression, b:Expression) => Mod(a,b) } )
+
+  lazy val term = product * (
+    "+" ^^^ { (a:Expression, b:Expression) => Plus(a,b) } |
+     "-" ^^^ { (a:Expression, b:Expression) => Minus(a,b) } )
+
+  // Comparison
+
+  lazy val equalTo: PackratParser[Expression] = term ~ ("===" | "=") ~ term ^^ 
{
+    case l ~ _ ~ r => EqualTo(l, r)
+  }
+
+  lazy val notEqualTo: PackratParser[Expression] = term ~ ("!==" | "!=" | 
"<>") ~ term ^^ {
+    case l ~ _ ~ r => NotEqualTo(l, r)
+  }
+
+  lazy val greaterThan: PackratParser[Expression] = term ~ ">" ~ term ^^ {
+    case l ~ _ ~ r => GreaterThan(l, r)
+  }
+
+  lazy val greaterThanOrEqual: PackratParser[Expression] = term ~ ">=" ~ term 
^^ {
+    case l ~ _ ~ r => GreaterThanOrEqual(l, r)
+  }
+
+  lazy val lessThan: PackratParser[Expression] = term ~ "<" ~ term ^^ {
+    case l ~ _ ~ r => LessThan(l, r)
+  }
+
+  lazy val lessThanOrEqual: PackratParser[Expression] = term ~ "<=" ~ term ^^ {
+    case l ~ _ ~ r => LessThanOrEqual(l, r)
+  }
+
+  lazy val comparison: PackratParser[Expression] =
+      equalTo | notEqualTo |
+      greaterThan | greaterThanOrEqual |
+      lessThan | lessThanOrEqual | term
+
+  // logic
+
+  lazy val logic = comparison * (
+    "&&" ^^^ { (a:Expression, b:Expression) => And(a,b) } |
+      "||" ^^^ { (a:Expression, b:Expression) => Or(a,b) } )
+
+  // alias
+
+  lazy val alias: PackratParser[Expression] = logic ~ AS ~ fieldReference ^^ {
+    case e ~ _ ~ name => Naming(e, name.name)
+  } | logic
+
+  lazy val expression: PackratParser[Expression] = alias
+
+  lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",")
+
+  def parseExpressionList(expression: String): List[Expression] = {
+    parseAll(expressionList, expression) match {
+      case Success(lst, _) => lst
+
+      case Failure(msg, _) => throw new ExpressionException("Could not parse 
expression: " + msg)
+
+      case Error(msg, _) => throw new ExpressionException("Could not parse 
expression: " + msg)
+    }
+  }
+
+  def parseExpression(exprString: String): Expression = {
+    parseAll(expression, exprString) match {
+      case Success(lst, _) => lst
+
+      case fail =>
+        throw new ExpressionException("Could not parse expression: " + 
fail.toString)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala
new file mode 100644
index 0000000..2e09f39
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan
+
+import org.apache.flink.api.table.expressions.analysis.SelectionAnalyzer
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.java.aggregation.Aggregations
+
+import scala.collection.mutable
+
+/**
+ * This is used to expand a [[Select]] that contains aggregations. If it is 
called on a [[Select]]
+ * without aggregations it is simply returned.
+ *
+ * This select:
+ * {{{
+ *   in.select('key, 'value.avg)
+ * }}}
+ *
+ * is transformed to this expansion:
+ * {{{
+ *   in
+ *     .select('key, 'value, Literal(1) as 'intermediate.1)
+ *     .aggregate('value.sum, 'intermediate.1.sum)
+ *     .select('key, 'value / 'intermediate.1)
+ * }}}
+ *
+ * If the input of the [[Select]] is a [[GroupBy]] this is preserved before 
the aggregation.
+ */
+object ExpandAggregations {
+  def apply(select: Select): PlanNode = select match {
+    case Select(input, selection) =>
+
+      val aggregations = mutable.HashMap[(Expression, Aggregations), String]()
+      val intermediateFields = mutable.HashSet[Expression]()
+      val aggregationIntermediates = mutable.HashMap[Aggregation, 
Seq[Expression]]()
+
+      var intermediateCount = 0
+      var resultCount = 0
+      selection foreach {  f =>
+        f.transformPre {
+          case agg: Aggregation =>
+            val intermediateReferences = 
agg.getIntermediateFields.zip(agg.getAggregations) map {
+              case (expr, basicAgg) =>
+                resultCount += 1
+                val resultName = s"result.$resultCount"
+                aggregations.get((expr, basicAgg)) match {
+                  case Some(intermediateName) =>
+                    Naming(ResolvedFieldReference(intermediateName, 
expr.typeInfo), resultName)
+                  case None =>
+                    intermediateCount = intermediateCount + 1
+                    val intermediateName = s"intermediate.$intermediateCount"
+                    intermediateFields += Naming(expr, intermediateName)
+                    aggregations((expr, basicAgg)) = intermediateName
+                    Naming(ResolvedFieldReference(intermediateName, 
expr.typeInfo), resultName)
+                }
+            }
+
+            aggregationIntermediates(agg) = intermediateReferences
+            // Return a NOP so that we don't add the children of the 
aggregation
+            // to intermediate fields. We already added the necessary fields 
to the list
+            // of intermediate fields.
+            NopExpression()
+
+          case fa: ResolvedFieldReference =>
+            if (!fa.name.startsWith("intermediate")) {
+              intermediateFields += Naming(fa, fa.name)
+            }
+            fa
+        }
+      }
+
+      if (aggregations.isEmpty) {
+        // no aggregations, just return
+        return select
+      }
+
+      // also add the grouping keys to the set of intermediate fields, because 
we use a Set,
+      // they are only added when not already present
+      input match {
+        case GroupBy(_, groupingFields) =>
+          groupingFields foreach {
+            case fa: ResolvedFieldReference =>
+              intermediateFields += Naming(fa, fa.name)
+          }
+        case _ => // Nothing to add
+      }
+
+      val basicAggregations = aggregations.map {
+        case ((expr, basicAgg), fieldName) =>
+          (fieldName, basicAgg)
+      }
+
+      val finalFields = selection.map {  f =>
+        f.transformPre {
+          case agg: Aggregation =>
+            val intermediates = aggregationIntermediates(agg)
+            agg.getFinalField(intermediates)
+        }
+      }
+
+      val intermediateAnalyzer = new SelectionAnalyzer(input.outputFields)
+      val analyzedIntermediates = 
intermediateFields.toSeq.map(intermediateAnalyzer.analyze)
+
+      val finalAnalyzer =
+        new SelectionAnalyzer(analyzedIntermediates.map(e => (e.name, 
e.typeInfo)))
+      val analyzedFinals = finalFields.map(finalAnalyzer.analyze)
+
+      val result = input match {
+        case GroupBy(groupByInput, groupingFields) =>
+          Select(
+            Aggregate(
+              GroupBy(
+                Select(groupByInput, analyzedIntermediates),
+                groupingFields),
+              basicAggregations.toSeq),
+            analyzedFinals)
+
+        case _ =>
+          Select(
+            Aggregate(
+              Select(input, analyzedIntermediates),
+              basicAggregations.toSeq),
+            analyzedFinals)
+
+      }
+
+      result
+
+    case _ => select
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
new file mode 100644
index 0000000..ba8aba4
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan
+
+import java.lang.reflect.Modifier
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.parser.ExpressionParser
+import org.apache.flink.api.table.expressions.{Expression, Naming, 
ResolvedFieldReference, UnresolvedFieldReference}
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+import org.apache.flink.api.table.{ExpressionException, Table}
+
+import scala.language.reflectiveCalls
+
+/**
+ * Base class for translators that transform the logical plan in a [[Table]] 
to an executable
+ * Flink plan and also for creating a [[Table]] from a DataSet or DataStream.
+ */
+abstract class PlanTranslator {
+
+  type Representation[A] <: { def getType(): TypeInformation[A] }
+
+  /**
+   * Translates the given Table API [[PlanNode]] back to the underlying 
representation, i.e,
+   * a DataSet or a DataStream.
+   */
+  def translate[A](op: PlanNode)(implicit tpe: TypeInformation[A]): 
Representation[A]
+
+  /**
+   * Creates a [[Table]] from a DataSet or a DataStream (the underlying 
representation).
+   */
+  def createTable[A](
+      repr: Representation[A],
+      inputType: CompositeType[A],
+      expressions: Array[Expression],
+      resultFields: Seq[(String, TypeInformation[_])]): Table
+
+  /**
+   * Creates a [[Table]] from the given DataSet or DataStream.
+   */
+  def createTable[A](repr: Representation[A]): Table = {
+
+    val fields = repr.getType() match {
+      case c: CompositeType[A] => c.getFieldNames.map(UnresolvedFieldReference)
+
+      case tpe => Array() // createTable will throw an exception for this later
+    }
+    createTable(
+      repr,
+      fields.toArray.asInstanceOf[Array[Expression]],
+      checkDeterministicFields = false)
+  }
+
+  /**
+   * Creates a [[Table]] from the given DataSet or DataStream while only 
taking those
+   * fields mentioned in the field expression.
+   */
+  def createTable[A](repr: Representation[A], expression: String): Table = {
+
+    val fields = ExpressionParser.parseExpressionList(expression)
+
+    createTable(repr, fields.toArray, checkDeterministicFields = true)
+  }
+
+  /**
+   * Creates a [[Table]] from the given DataSet or DataStream while only 
taking those
+   * fields mentioned in the fields parameter.
+   *
+   * When checkDeterministicFields is true check whether the fields of the 
underlying
+   * [[TypeInformation]] have a deterministic ordering. This is only the case 
for Tuples
+   * and Case classes. For a POJO, the field order is not obvious, this can 
lead to problems
+   * when a user renames fields and assumes a certain ordering.
+   */
+  def createTable[A](
+      repr: Representation[A],
+      fields: Array[Expression],
+      checkDeterministicFields: Boolean = true): Table = {
+
+    // shortcut for DataSet[Row] or DataStream[Row]
+    repr.getType() match {
+      case rowTypeInfo: RowTypeInfo =>
+        val expressions = rowTypeInfo.getFieldNames map {
+          name => (name, rowTypeInfo.getTypeAt(name))
+        }
+        new Table(
+          Root(repr, expressions))
+
+      case c: CompositeType[A] => // us ok
+
+      case tpe => throw new ExpressionException("Only DataSets or DataStreams 
of composite type" +
+        "can be transformed to a Table. These would be tuples, case classes 
and " +
+        "POJOs. Type is: " + tpe)
+
+    }
+
+    val clazz = repr.getType().getTypeClass
+    if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers))
+        || clazz.getCanonicalName() == null) {
+      throw new ExpressionException("Cannot create Table from DataSet or 
DataStream of type " +
+        clazz.getName + ". Only top-level classes or static members classes " +
+        " are supported.")
+    }
+
+    val inputType = repr.getType().asInstanceOf[CompositeType[A]]
+
+    if (!inputType.hasDeterministicFieldOrder && checkDeterministicFields) {
+      throw new ExpressionException(s"You cannot rename fields upon Table 
creation: " +
+        s"Field order of input type $inputType is not deterministic." )
+    }
+
+    if (fields.length != inputType.getFieldNames.length) {
+      throw new ExpressionException("Number of selected fields: '" + 
fields.mkString(",") +
+        "' and number of fields in input type " + inputType + " do not match.")
+    }
+
+    val newFieldNames = fields map {
+      case UnresolvedFieldReference(name) => name
+      case e =>
+        throw new ExpressionException("Only field references allowed in 'as' 
operation, " +
+          " offending expression: " + e)
+    }
+
+    if (newFieldNames.toSet.size != newFieldNames.size) {
+      throw new ExpressionException(s"Ambiguous field names in 
${fields.mkString(", ")}")
+    }
+
+    val resultFields: Seq[(String, TypeInformation[_])] = 
newFieldNames.zipWithIndex map {
+      case (name, index) => (name, inputType.getTypeAt(index))
+    }
+
+    val inputFields = inputType.getFieldNames
+    val fieldMappings = inputFields.zip(resultFields)
+    val expressions: Array[Expression] = fieldMappings map {
+      case (oldName, (newName, tpe)) => Naming(ResolvedFieldReference(oldName, 
tpe), newName)
+    }
+
+    createTable(repr, inputType, expressions, resultFields)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
new file mode 100644
index 0000000..7ec34d7
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.aggregation.Aggregations
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.trees.TreeNode
+
+/**
+ * Base class for all Table API operations.
+ */
+sealed abstract class PlanNode extends TreeNode[PlanNode] { self: Product =>
+  def outputFields: Seq[(String, TypeInformation[_])]
+}
+
+/**
+ * Operation that transforms a [[org.apache.flink.api.scala.DataSet]] or
+ * [[org.apache.flink.streaming.api.scala.DataStream]] into a 
[[org.apache.flink.api.table.Table]].
+ */
+case class Root[T](input: T, outputFields: Seq[(String, TypeInformation[_])]) 
extends PlanNode {
+  val children = Nil
+  override def toString = s"Root($outputFields)"
+}
+
+/**
+ * Operation that joins two [[org.apache.flink.api.table.Table]]s. A "filter" 
and a "select"
+ * should be applied after a join operation.
+ */
+case class Join(left: PlanNode, right: PlanNode) extends PlanNode {
+
+  val children = Seq(left, right)
+
+  def outputFields = left.outputFields ++ right.outputFields
+
+  override def toString = s"Join($left, $right)"
+}
+
+/**
+ * Operation that filters out elements that do not match the predicate 
expression.
+ */
+case class Filter(input: PlanNode, predicate: Expression) extends PlanNode {
+
+  val children = Seq(input)
+
+  def outputFields = input.outputFields
+
+  override def toString = s"Filter($input, $predicate)"
+}
+
+/**
+ * Selection expression. Similar to an SQL SELECT statement. The expressions 
can select fields
+ * and perform arithmetic or logic operations. The expressions can also 
perform aggregates
+ * on fields.
+ */
+case class Select(input: PlanNode, selection: Seq[Expression]) extends 
PlanNode {
+
+  val children = Seq(input)
+
+  def outputFields = selection.toSeq map { e => (e.name, e.typeInfo) }
+
+  override def toString = s"Select($input, ${selection.mkString(",")})"
+}
+
+/**
+ * Operation that gives new names to fields. Use this to disambiguate fields 
before a join
+ * operation.
+ */
+case class As(input: PlanNode, names: Seq[String]) extends PlanNode {
+
+  val children = Seq(input)
+
+  val outputFields = input.outputFields.zip(names) map {
+    case ((_, tpe), newName) => (newName, tpe)
+  }
+
+  override def toString = s"As($input, ${names.mkString(",")})"
+}
+
+/**
+ * Grouping operation. Keys are specified using field references. A group by 
operation os only
+ * useful when performing a select with aggregates afterwards.
+ * @param input
+ * @param fields
+ */
+case class GroupBy(input: PlanNode, fields: Seq[Expression]) extends PlanNode {
+
+  val children = Seq(input)
+
+  def outputFields = input.outputFields
+
+  override def toString = s"GroupBy($input, ${fields.mkString(",")})"
+}
+
+/**
+ * Internal operation. Selection operations containing aggregates are expanded 
to an [[Aggregate]]
+ * and a simple [[Select]].
+ */
+case class Aggregate(
+    input: PlanNode,
+    aggregations: Seq[(String, Aggregations)]) extends PlanNode {
+
+  val children = Seq(input)
+
+  def outputFields = input.outputFields
+
+  override def toString = s"Aggregate($input, ${aggregations.mkString(",")})"
+}
+
+/**
+ * UnionAll operation, union all elements from left and right.
+ */
+case class UnionAll(left: PlanNode, right: PlanNode) extends PlanNode{
+  val children = Seq(left, right)
+
+  def outputFields = left.outputFields
+
+  override def toString = s"Union($left, $right)"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
new file mode 100644
index 0000000..a598483
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+/**
+ * The operations in this package are created by calling methods on [[Table]] 
they
+ * should not be manually created by users of the API.
+ */
+package object plan

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
new file mode 100644
index 0000000..932f9df
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable
+import org.apache.flink.api.java.aggregation.AggregationFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+
+@Combinable
+class ExpressionAggregateFunction(
+    private val fieldPositions: Seq[Int],
+    private val functions: Seq[AggregationFunction[Any]])
+  extends RichGroupReduceFunction[Row, Row] {
+
+  override def open(conf: Configuration): Unit = {
+    var i = 0
+    val len = functions.length
+    while (i < len) {
+      functions(i).initializeAggregate()
+      i += 1
+    }
+  }
+
+  override def reduce(in: java.lang.Iterable[Row], out: Collector[Row]): Unit 
= {
+
+    val fieldPositions = this.fieldPositions
+    val functions = this.functions
+
+    var current: Row = null
+
+    val values = in.iterator()
+    while (values.hasNext) {
+      current = values.next()
+
+      var i = 0
+      val len = functions.length
+      while (i < len) {
+        functions(i).aggregate(current.productElement(fieldPositions(i)))
+        i += 1
+      }
+    }
+
+    var i = 0
+    val len = functions.length
+    while (i < len) {
+      current.setField(fieldPositions(i), functions(i).getAggregate)
+      functions(i).initializeAggregate()
+      i += 1
+    }
+
+    out.collect(current)
+  }
+
+}
+
+@Combinable
+class NoExpressionAggregateFunction() extends RichGroupReduceFunction[Row, 
Row] {
+
+  override def reduce(in: java.lang.Iterable[Row], out: Collector[Row]): Unit 
= {
+
+    var first: Row = null
+
+    val values = in.iterator()
+    if (values.hasNext) {
+      first = values.next()
+    }
+
+    out.collect(first)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
new file mode 100644
index 0000000..4e50272
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.functions.{FilterFunction, 
RichFilterFunction}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.GenerateFilter
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.configuration.Configuration
+
+/**
+ * Proxy function that takes an expression predicate. This is compiled
+ * upon runtime and calls to [[filter()]] are forwarded to the compiled code.
+ */
+class ExpressionFilterFunction[T](
+    predicate: Expression,
+    inputType: CompositeType[T],
+    config: TableConfig = TableConfig.DEFAULT) extends RichFilterFunction[T] {
+
+  var compiledFilter: FilterFunction[T] = null
+
+  override def open(c: Configuration): Unit = {
+    if (compiledFilter == null) {
+      val codegen = new GenerateFilter[T](
+        inputType,
+        predicate,
+        getRuntimeContext.getUserCodeClassLoader,
+        config)
+      compiledFilter = codegen.generate()
+    }
+  }
+
+  override def filter(in: T) = compiledFilter.filter(in)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
new file mode 100644
index 0000000..cf2c90f
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.functions.{FlatJoinFunction, 
RichFlatJoinFunction}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.GenerateJoin
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+
+/**
+ * Proxy function that takes an expression predicate and output fields. These 
are compiled
+ * upon runtime and calls to [[join()]] are forwarded to the compiled code.
+ */
+class ExpressionJoinFunction[L, R, O](
+    predicate: Expression,
+    leftType: CompositeType[L],
+    rightType: CompositeType[R],
+    resultType: CompositeType[O],
+    outputFields: Seq[Expression],
+    config: TableConfig = TableConfig.DEFAULT) extends RichFlatJoinFunction[L, 
R, O] {
+
+  var compiledJoin: FlatJoinFunction[L, R, O] = null
+
+  override def open(c: Configuration): Unit = {
+    val codegen = new GenerateJoin[L, R, O](
+      leftType,
+      rightType,
+      resultType,
+      predicate,
+      outputFields,
+      getRuntimeContext.getUserCodeClassLoader,
+      config)
+    compiledJoin = codegen.generate()
+  }
+
+  def join(left: L, right: R, out: Collector[O]) = {
+    compiledJoin.join(left, right, out)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
new file mode 100644
index 0000000..ab7adb1
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.codegen.GenerateSelect
+import org.apache.flink.configuration.Configuration
+
+/**
+ * Proxy function that takes expressions. These are compiled
+ * upon runtime and calls to [[map()]] are forwarded to the compiled code.
+ */
+class ExpressionSelectFunction[I, O](
+     inputType: CompositeType[I],
+     resultType: CompositeType[O],
+     outputFields: Seq[Expression],
+     config: TableConfig = TableConfig.DEFAULT) extends RichMapFunction[I, O] {
+
+  var compiledSelect: MapFunction[I, O] = null
+
+  override def open(c: Configuration): Unit = {
+
+    if (compiledSelect == null) {
+      val resultCodegen = new GenerateSelect[I, O](
+        inputType,
+        resultType,
+        outputFields,
+        getRuntimeContext.getUserCodeClassLoader,
+        config)
+
+      compiledSelect = resultCodegen.generate()
+    }
+  }
+
+  def map(in: I): O = {
+    compiledSelect.map(in)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
new file mode 100644
index 0000000..a1bc4b7
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+/**
+ * The functions in this package are used transforming Table API operations to 
Java API operations.
+ */
+package object runtime

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala
new file mode 100644
index 0000000..87051cf
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.trees
+
+/**
+ * Base class for tree analyzers/transformers. Analyzers must implement method 
`rules` to
+ * provide the chain of rules that are invoked one after another. The tree 
resulting
+ * from one rule is fed into the next rule and the final result is returned 
from method `analyze`.
+ */
+abstract class Analyzer[A <: TreeNode[A]] {
+
+  def rules: Seq[Rule[A]]
+
+  final def analyze(expr: A): A = {
+    var currentTree = expr
+    for (rule <- rules) {
+      var running = true
+      while (running) {
+        val newTree = rule(currentTree)
+        if (newTree fastEquals currentTree) {
+          running = false
+        }
+        currentTree = newTree
+      }
+    }
+    currentTree
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala
new file mode 100644
index 0000000..b8a27cb
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.trees
+
+/**
+ * Base class for a rule that is part of an [[Analyzer]] rule chain. Method 
`rule` gets a tree
+ * and must return a tree. The returned tree can also be the input tree. In an 
[[Analyzer]]
+ * rule chain the result tree of one [[Rule]] is fed into the next [[Rule]] in 
the chain.
+ *
+ * A [[Rule]] is repeatedly applied to a tree until the tree does not change 
between
+ * rule applications.
+ */
+abstract class Rule[A <: TreeNode[A]] {
+  def apply(expr: A): A
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
new file mode 100644
index 0000000..84f1d7e
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.trees
+
+/**
+ * Generic base class for trees that can be transformed and traversed.
+ */
+abstract class TreeNode[A <: TreeNode[A]] { self: A with Product =>
+
+  /**
+   * List of child nodes that should be considered when doing transformations. 
Other values
+   * in the Product will not be transformed, only handed through.
+   */
+  def children: Seq[A]
+
+  /**
+   * Tests for equality by first testing for reference equality.
+   */
+  def fastEquals(other: TreeNode[_]): Boolean = this.eq(other) || this == other
+
+  def transformPre(rule: PartialFunction[A, A]): A = {
+    val afterTransform = rule.applyOrElse(this, identity[A])
+
+    if (afterTransform fastEquals this) {
+      this.transformChildrenPre(rule)
+    } else {
+      afterTransform.transformChildrenPre(rule)
+    }
+  }
+
+  def transformChildrenPre(rule: PartialFunction[A, A]): A = {
+    var changed = false
+    val newArgs = productIterator map {
+      case child: A if children.contains(child) =>
+        val newChild = child.transformPre(rule)
+        if (newChild fastEquals child) {
+          child
+        } else {
+          changed = true
+          newChild
+        }
+      case other: AnyRef => other
+      case null => null
+    } toArray
+
+    if (changed) makeCopy(newArgs) else this
+  }
+
+  def transformPost(rule: PartialFunction[A, A]): A = {
+    val afterChildren = transformChildrenPost(rule)
+    if (afterChildren fastEquals this) {
+      rule.applyOrElse(this, identity[A])
+    } else {
+      rule.applyOrElse(afterChildren, identity[A])
+    }
+  }
+
+  def transformChildrenPost(rule: PartialFunction[A, A]): A = {
+    var changed = false
+    val newArgs = productIterator map {
+      case child: A if children.contains(child) =>
+        val newChild = child.transformPost(rule)
+        if (newChild fastEquals child) {
+          child
+        } else {
+          changed = true
+          newChild
+        }
+      case other: AnyRef => other
+      case null => null
+    } toArray
+    // toArray forces evaluation, toSeq does not seem to work here
+
+    if (changed) makeCopy(newArgs) else this
+  }
+
+  def exists(predicate: A => Boolean): Boolean = {
+    var exists = false
+    this.transformPre {
+      case e: A => if (predicate(e)) {
+        exists = true
+      }
+        e
+    }
+    exists
+  }
+
+  /**
+   * Creates a new copy of this expression with new children. This is used 
during transformation
+   * if children change. This must be overridden by tree nodes that don't have 
the Constructor
+   * arguments in the same order as the `children`.
+   */
+  def makeCopy(newArgs: Seq[AnyRef]): this.type = {
+    val defaultCtor =
+      this.getClass.getConstructors.find { _.getParameterTypes.size > 0}.head
+    try {
+      defaultCtor.newInstance(newArgs.toArray: _*).asInstanceOf[this.type]
+    } catch {
+      case iae: IllegalArgumentException =>
+        println("IAE " + this)
+        throw new RuntimeException("Should never happen.")
+    }
+  }
+}
+

Reply via email to