http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
new file mode 100644
index 0000000..8b72cb6
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.table
+
+import org.apache.flink.api.table._
+import org.apache.flink.api.table.tree.{UnresolvedFieldReference, Expression}
+import org.apache.flink.api.common.typeutils.CompositeType
+
+import org.apache.flink.api.scala._
+
+/**
+ * Methods for converting a [[DataSet]] to a [[Table]]. A [[DataSet]] is
+ * wrapped in this by the implicit conversions in 
[[org.apache.flink.api.scala.table]].
+ */
+class DataSetConversions[T](set: DataSet[T], inputType: CompositeType[T]) {
+
+  /**
+   * Converts the [[DataSet]] to a [[Table]]. The field names can be specified 
like this:
+   *
+   * {{{
+   *   val in: DataSet[(String, Int)] = ...
+   *   val table = in.as('a, 'b)
+   * }}}
+   *
+   * This results in a [[Table]] that has field `a` of type `String` and field 
`b`
+   * of type `Int`.
+   */
+  def as(fields: Expression*): Table[ScalaBatchTranslator] = {
+     new ScalaBatchTranslator().createTable(set, fields.toArray)
+  }
+
+  /**
+   * Converts the [[DataSet]] to a [[Table]]. The field names will be taken 
from the field names
+   * of the input type.
+   *
+   * Example:
+   *
+   * {{{
+   *   val in: DataSet[(String, Int)] = ...
+   *   val table = in.toTable
+   * }}}
+   *
+   * Here, the result is a [[Table]] that has field `_1` of type `String` and 
field `_2`
+   * of type `Int`.
+   */
+  def toTable: Table[ScalaBatchTranslator] = {
+    val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference)
+    as(resultFields: _*)
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
new file mode 100644
index 0000000..5da02c4
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table
+
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table._
+import org.apache.flink.api.table.tree.{Expression, UnresolvedFieldReference}
+import org.apache.flink.streaming.api.scala.DataStream
+
+class DataStreamConversions[T](stream: DataStream[T], inputType: 
CompositeType[T]) {
+
+  /**
+   * Converts the [[DataStream]] to a [[Table]]. The field names can be 
specified like this:
+   *
+   * {{{
+   *   val in: DataSet[(String, Int)] = ...
+   *   val table = in.as('a, 'b)
+   * }}}
+   *
+   * This results in a [[Table]] that has field `a` of type `String` and field 
`b`
+   * of type `Int`.
+   */
+
+  def as(fields: Expression*): Table[ScalaStreamingTranslator] = {
+     new ScalaStreamingTranslator().createTable(
+       stream,
+       fields.toArray,
+       checkDeterministicFields = 
true).asInstanceOf[Table[ScalaStreamingTranslator]]
+  }
+
+  /**
+   * Converts the [[DataStream]] to a [[Table]]. The field names will be taken 
from the field
+   * names of the input type.
+   *
+   * Example:
+   *
+   * {{{
+   *   val in: DataSet[(String, Int)] = ...
+   *   val table = in.toTable
+   * }}}
+   *
+   * This results in a [[Table]] that has field `_1` of type `String` and 
field `_2`
+   * of type `Int`.
+   */
+
+  def toTable: Table[ScalaStreamingTranslator] = {
+    val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference)
+    as(resultFields: _*)
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
new file mode 100644
index 0000000..4b25a50
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table
+
+
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.table.JavaBatchTranslator
+import org.apache.flink.api.table.tree.Expression
+import org.apache.flink.api.scala.wrap
+import org.apache.flink.api.table.operations._
+import org.apache.flink.api.table.Table
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.DataSet
+
+import scala.reflect.ClassTag
+
+
+/**
+ * [[TableTranslator]] for creating [[Table]]s from Scala [[DataSet]]s and
+ * translating them back to Scala [[DataSet]]s.
+ */
+class ScalaBatchTranslator extends TableTranslator {
+
+  private val javaTranslator = new JavaBatchTranslator
+
+  type Representation[A] = DataSet[A]
+
+  def createTable[A](
+      repr: DataSet[A],
+      fields: Array[Expression]): Table[ScalaBatchTranslator] = {
+
+    val result = javaTranslator.createTable(repr.javaSet, fields)
+
+    new Table[ScalaBatchTranslator](result.operation, this)
+  }
+
+  override def translate[O](op: Operation)(implicit tpe: TypeInformation[O]): 
DataSet[O] = {
+    // fake it till you make it ...
+    
wrap(javaTranslator.translate(op))(ClassTag.AnyRef.asInstanceOf[ClassTag[O]])
+  }
+
+  override def createTable[A](
+      repr: Representation[A],
+      inputType: CompositeType[A],
+      expressions: Array[Expression],
+      resultFields: Seq[(String, TypeInformation[_])]): Table[this.type] = {
+
+    val result = javaTranslator.createTable(repr.javaSet, inputType, 
expressions, resultFields)
+
+    Table(result.operation, this)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
new file mode 100644
index 0000000..f17f273
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.table.JavaStreamingTranslator
+import org.apache.flink.api.table.Table
+import org.apache.flink.api.table.operations._
+import org.apache.flink.api.table.tree.Expression
+import org.apache.flink.streaming.api.scala.{DataStream, javaToScalaStream}
+
+/**
+ * [[TableTranslator]] for creating [[Table]]s from Scala [[DataStream]]s and
+ * translating them back to Scala [[DataStream]]s.
+ *
+ * This is very limited right now. Only select and filter are implemented. 
Also, the expression
+ * operations must be extended to allow windowing operations.
+ */
+class ScalaStreamingTranslator extends TableTranslator {
+
+  private val javaTranslator = new JavaStreamingTranslator
+
+  override type Representation[A] = DataStream[A]
+
+  override def translate[O](op: Operation)(implicit tpe: TypeInformation[O]): 
DataStream[O] = {
+    // fake it till you make it ...
+    javaToScalaStream(javaTranslator.translate(op))
+  }
+
+  override def createTable[A](
+      repr: Representation[A],
+      inputType: CompositeType[A],
+      expressions: Array[Expression],
+      resultFields: Seq[(String, TypeInformation[_])]): Table[this.type] = {
+
+    val result =
+      javaTranslator.createTable(repr.getJavaStream, inputType, expressions, 
resultFields)
+
+    new Table(result.operation, this)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
new file mode 100644
index 0000000..e08ceb3
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.table
+
+import org.apache.flink.api.table.tree._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+import scala.language.implicitConversions
+
+/**
+ * These are all the operations that can be used to construct an 
[[Expression]] AST for expression
+ * operations.
+ *
+ * These operations must be kept in sync with the parser in
+ * [[org.apache.flink.api.table.parser.ExpressionParser]].
+ */
+trait ImplicitExpressionOperations {
+  def expr: Expression
+
+  def && (other: Expression) = And(expr, other)
+  def || (other: Expression) = Or(expr, other)
+
+  def > (other: Expression) = GreaterThan(expr, other)
+  def >= (other: Expression) = GreaterThanOrEqual(expr, other)
+  def < (other: Expression) = LessThan(expr, other)
+  def <= (other: Expression) = LessThanOrEqual(expr, other)
+
+  def === (other: Expression) = EqualTo(expr, other)
+  def !== (other: Expression) = NotEqualTo(expr, other)
+
+  def unary_! = Not(expr)
+  def unary_- = UnaryMinus(expr)
+
+  def isNull = IsNull(expr)
+  def isNotNull = IsNotNull(expr)
+
+  def + (other: Expression) = Plus(expr, other)
+  def - (other: Expression) = Minus(expr, other)
+  def / (other: Expression) = Div(expr, other)
+  def * (other: Expression) = Mul(expr, other)
+  def % (other: Expression) = Mod(expr, other)
+
+  def & (other: Expression) = BitwiseAnd(expr, other)
+  def | (other: Expression) = BitwiseOr(expr, other)
+  def ^ (other: Expression) = BitwiseXor(expr, other)
+  def unary_~ = BitwiseNot(expr)
+
+  def abs = Abs(expr)
+
+  def sum = Sum(expr)
+  def min = Min(expr)
+  def max = Max(expr)
+  def count = Count(expr)
+  def avg = Avg(expr)
+
+  def substring(beginIndex: Expression, endIndex: Expression = 
Literal(Int.MaxValue)) = {
+    Substring(expr, beginIndex, endIndex)
+  }
+
+  def cast(toType: TypeInformation[_]) = Cast(expr, toType)
+
+  def as(name: Symbol) = Naming(expr, name.name)
+}
+
+/**
+ * Implicit conversions from Scala Literals to Expression [[Literal]] and from 
[[Expression]]
+ * to [[ImplicitExpressionOperations]].
+ */
+trait ImplicitExpressionConversions {
+  implicit class WithOperations(e: Expression) extends 
ImplicitExpressionOperations {
+    def expr = e
+  }
+
+  implicit class SymbolExpression(s: Symbol) extends 
ImplicitExpressionOperations {
+    def expr = UnresolvedFieldReference(s.name)
+  }
+
+  implicit class LiteralLongExpression(l: Long) extends 
ImplicitExpressionOperations {
+    def expr = Literal(l)
+  }
+
+  implicit class LiteralIntExpression(i: Int) extends 
ImplicitExpressionOperations {
+    def expr = Literal(i)
+  }
+
+  implicit class LiteralFloatExpression(f: Float) extends 
ImplicitExpressionOperations {
+    def expr = Literal(f)
+  }
+
+  implicit class LiteralDoubleExpression(d: Double) extends 
ImplicitExpressionOperations {
+    def expr = Literal(d)
+  }
+
+  implicit class LiteralStringExpression(str: String) extends 
ImplicitExpressionOperations {
+    def expr = Literal(str)
+  }
+
+  implicit class LiteralBooleanExpression(bool: Boolean) extends 
ImplicitExpressionOperations {
+    def expr = Literal(bool)
+  }
+
+  implicit def symbol2FieldExpression(sym: Symbol): Expression = 
UnresolvedFieldReference(sym.name)
+  implicit def int2Literal(i: Int): Expression = Literal(i)
+  implicit def long2Literal(l: Long): Expression = Literal(l)
+  implicit def double2Literal(d: Double): Expression = Literal(d)
+  implicit def float2Literal(d: Float): Expression = Literal(d)
+  implicit def string2Literal(str: String): Expression = Literal(str)
+  implicit def boolean2Literal(bool: Boolean): Expression = Literal(bool)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
new file mode 100644
index 0000000..10084de
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.{Row, Table}
+import org.apache.flink.streaming.api.scala.DataStream
+
+import scala.language.implicitConversions
+
+/**
+ * == Table API (Scala) ==
+ *
+ * Importing this package with:
+ *
+ * {{{
+ *   import org.apache.flink.api.scala.table._
+ * }}}
+ *
+ * imports implicit conversions for converting a [[DataSet]] or [[DataStream]] 
to a
+ * [[Table]]. This can be used to perform SQL-like queries on data. Please have
+ * a look at [[Table]] to see which operations are supported and
+ * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]] to see 
how an
+ * expression can be specified.
+ *
+ * When writing a query you can use Scala Symbols to refer to field names. One 
would
+ * refer to field `a` by writing `'a`. Sometimes it is necessary to manually 
confert a
+ * Scala literal to an Expression Literal, in those cases use `Literal`, as in 
`Literal(3)`.
+ *
+ * Example:
+ *
+ * {{{
+ *   import org.apache.flink.api.scala._
+ *   import org.apache.flink.api.scala.table._
+ *
+ *   val env = ExecutionEnvironment.getExecutionEnvironment
+ *   val input = env.fromElements(("Hello", 2), ("Hello", 5), ("Ciao", 3))
+ *   val result = input.as('word, 'count).groupBy('word).select('word, 
'count.avg)
+ *   result.print()
+ *
+ *   env.execute()
+ * }}}
+ *
+ * A [[Table]] can be converted back to the underlying API
+ * representation using `as`:
+ *
+ * {{{
+ *   case class Word(word: String, count: Int)
+ *
+ *   val result = in.select(...).as('word, 'count)
+ *   val set = result.as[Word]
+ * }}}
+ */
+package object table extends ImplicitExpressionConversions {
+
+  implicit def dataSet2DataSetConversions[T](set: DataSet[T]): 
DataSetConversions[T] = {
+    new DataSetConversions[T](set, set.getType.asInstanceOf[CompositeType[T]])
+  }
+
+  implicit def table2RowDataSet(
+      table: Table[ScalaBatchTranslator]): DataSet[Row] = {
+    table.as[Row]
+  }
+
+  implicit def rowDataSet2Table(
+      rowDataSet: DataSet[Row]): Table[ScalaBatchTranslator] = {
+    rowDataSet.toTable
+  }
+
+  implicit def dataStream2DataSetConversions[T](
+      stream: DataStream[T]): DataStreamConversions[T] = {
+    new DataStreamConversions[T](
+      stream,
+      stream.getJavaStream.getType.asInstanceOf[CompositeType[T]])
+  }
+
+  implicit def table2RowDataStream(
+      table: Table[ScalaStreamingTranslator]): DataStream[Row] = {
+    table.as[Row]
+  }
+
+  implicit def rowDataStream2Table(
+      rowDataStream: DataStream[Row]): Table[ScalaStreamingTranslator] = {
+    rowDataStream.toTable
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala
new file mode 100644
index 0000000..51c0a4d
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+/**
+ * Exception for all errors occurring during expression evaluation.
+ */
+class ExpressionException(msg: String) extends RuntimeException(msg)

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala
new file mode 100644
index 0000000..e3baab3
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+/**
+ * This is used for executing Table API operations. We use manually generated
+ * TypeInfo to check the field types and create serializers and comparators.
+ */
+class Row(arity: Int) extends Product {
+
+  private val fields = new Array[Any](arity)
+
+  def productArity = fields.length
+
+  def productElement(i: Int): Any = fields(i)
+
+  def setField(i: Int, value: Any): Unit = fields(i) = value
+
+  def canEqual(that: Any) = false
+
+  override def toString = fields.mkString(",")
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
new file mode 100644
index 0000000..38a1760
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.analysis.{GroupByAnalyzer, 
SelectionAnalyzer, PredicateAnalyzer}
+import org.apache.flink.api.table.operations._
+import org.apache.flink.api.table.parser.ExpressionParser
+import org.apache.flink.api.table.tree.{ResolvedFieldReference, 
UnresolvedFieldReference, Expression}
+
+/**
+ * The abstraction for writing Table API programs. Similar to how the batch 
and streaming APIs
+ * have [[org.apache.flink.api.scala.DataSet]] and
+ * [[org.apache.flink.streaming.api.scala.DataStream]].
+ *
+ * Use the methods of [[Table]] to transform data or to revert back to the 
underlying
+ * batch or streaming representation.
+ */
+case class Table[A <: TableTranslator](
+    private[flink] val operation: Operation,
+    private[flink] val operationTranslator: A) {
+
+
+  /**
+   * Converts the result of this operation back to a 
[[org.apache.flink.api.scala.DataSet]] or
+   * [[org.apache.flink.streaming.api.scala.DataStream]].
+   */
+  def as[O](implicit tpe: TypeInformation[O]): 
operationTranslator.Representation[O] = {
+    operationTranslator.translate(operation)
+  }
+
+  /**
+   * Performs a selection operation. Similar to an SQL SELECT statement. The 
field expressions
+   * can contain complex expressions and aggregations.
+   *
+   * Example:
+   *
+   * {{{
+   *   in.select('key, 'value.avg + " The average" as 'average, 
'other.substring(0, 10))
+   * }}}
+   */
+  def select(fields: Expression*): Table[A] = {
+    val analyzer = new SelectionAnalyzer(operation.outputFields)
+    val analyzedFields = fields.map(analyzer.analyze)
+    val fieldNames = analyzedFields map(_.name)
+    if (fieldNames.toSet.size != fieldNames.size) {
+      throw new ExpressionException(s"Resulting fields names are not unique in 
expression" +
+        s""" "${fields.mkString(", ")}".""")
+    }
+    this.copy(operation = Select(operation, analyzedFields))
+  }
+
+  /**
+   * Performs a selection operation. Similar to an SQL SELECT statement. The 
field expressions
+   * can contain complex expressions and aggregations.
+   *
+   * Example:
+   *
+   * {{{
+   *   in.select("key, value.avg + " The average" as average, 
other.substring(0, 10)")
+   * }}}
+   */
+  def select(fields: String): Table[A] = {
+    val fieldExprs = ExpressionParser.parseExpressionList(fields)
+    select(fieldExprs: _*)
+  }
+
+  /**
+   * Renames the fields of the expression result. Use this to disambiguate 
fields before
+   * joining to operations.
+   *
+   * Example:
+   *
+   * {{{
+   *   in.as('a, 'b)
+   * }}}
+   */
+  def as(fields: Expression*): Table[A] = {
+    fields forall {
+      f => f.isInstanceOf[UnresolvedFieldReference]
+    } match {
+      case true =>
+      case false => throw new ExpressionException("Only field expression 
allowed in as().")
+    }
+    this.copy(operation = As(operation, fields.toArray map { _.name }))
+  }
+
+  /**
+   * Renames the fields of the expression result. Use this to disambiguate 
fields before
+   * joining to operations.
+   *
+   * Example:
+   *
+   * {{{
+   *   in.as("a, b")
+   * }}}
+   */
+  def as(fields: String): Table[A] = {
+    val fieldExprs = ExpressionParser.parseExpressionList(fields)
+    as(fieldExprs: _*)
+  }
+
+  /**
+   * Filters out elements that don't pass the filter predicate. Similar to a 
SQL WHERE
+   * clause.
+   *
+   * Example:
+   *
+   * {{{
+   *   in.filter('name === "Fred")
+   * }}}
+   */
+  def filter(predicate: Expression): Table[A] = {
+    val analyzer = new PredicateAnalyzer(operation.outputFields)
+    val analyzedPredicate = analyzer.analyze(predicate)
+    this.copy(operation = Filter(operation, analyzedPredicate))
+  }
+
+  /**
+   * Filters out elements that don't pass the filter predicate. Similar to a 
SQL WHERE
+   * clause.
+   *
+   * Example:
+   *
+   * {{{
+   *   in.filter("name === 'Fred'")
+   * }}}
+   */
+  def filter(predicate: String): Table[A] = {
+    val predicateExpr = ExpressionParser.parseExpression(predicate)
+    filter(predicateExpr)
+  }
+
+  /**
+   * Filters out elements that don't pass the filter predicate. Similar to a 
SQL WHERE
+   * clause.
+   *
+   * Example:
+   *
+   * {{{
+   *   in.filter(name === "Fred")
+   * }}}
+   */
+  def where(predicate: Expression): Table[A] = {
+    filter(predicate)
+  }
+
+  /**
+   * Filters out elements that don't pass the filter predicate. Similar to a 
SQL WHERE
+   * clause.
+   *
+   * Example:
+   *
+   * {{{
+   *   in.filter("name === 'Fred'")
+   * }}}
+   */
+  def where(predicate: String): Table[A] = {
+    filter(predicate)
+  }
+
+  /**
+   * Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+   * to perform the aggregation on a per-group basis. Similar to a SQL GROUP 
BY statement.
+   *
+   * Example:
+   *
+   * {{{
+   *   in.groupBy('key).select('key, 'value.avg)
+   * }}}
+   */
+  def groupBy(fields: Expression*): Table[A] = {
+    val analyzer = new GroupByAnalyzer(operation.outputFields)
+    val analyzedFields = fields.map(analyzer.analyze)
+
+    val illegalKeys = analyzedFields filter {
+      case fe: ResolvedFieldReference => false // OK
+      case e => true
+    }
+
+    if (illegalKeys.nonEmpty) {
+      throw new ExpressionException("Illegal key expressions: " + 
illegalKeys.mkString(", "))
+    }
+
+    this.copy(operation = GroupBy(operation, analyzedFields))
+  }
+
+  /**
+   * Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+   * to perform the aggregation on a per-group basis. Similar to a SQL GROUP 
BY statement.
+   *
+   * Example:
+   *
+   * {{{
+   *   in.groupBy("key").select("key, value.avg")
+   * }}}
+   */
+  def groupBy(fields: String): Table[A] = {
+    val fieldsExpr = ExpressionParser.parseExpressionList(fields)
+    groupBy(fieldsExpr: _*)
+  }
+
+  /**
+   * Joins to [[Table]]s. Similar to an SQL join. The fields of the two joined
+   * operations must not overlap, use [[as]] to rename fields if necessary. 
You can use
+   * where and select clauses after a join to further specify the behaviour of 
the join.
+   *
+   * Example:
+   *
+   * {{{
+   *   left.join(right).where('a === 'b && 'c > 3).select('a, 'b, 'd)
+   * }}}
+   */
+  def join(right: Table[A]): Table[A] = {
+    val leftInputNames = operation.outputFields.map(_._1).toSet
+    val rightInputNames = right.operation.outputFields.map(_._1).toSet
+    if (leftInputNames.intersect(rightInputNames).nonEmpty) {
+      throw new ExpressionException(
+        "Overlapping fields names on join input, result would be ambiguous: " +
+          operation.outputFields.mkString(", ") +
+          " and " +
+          right.operation.outputFields.mkString(", ") )
+    }
+    this.copy(operation = Join(operation, right.operation))
+  }
+
+  override def toString: String = s"Expression($operation)"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/Analyzer.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/Analyzer.scala
new file mode 100644
index 0000000..5db17d5
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/Analyzer.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.analysis
+
+import org.apache.flink.api.table.tree.Expression
+
+/**
+ * Base class for expression analyzers/transformers. Analyzers must implement 
method `rules` to
+ * provide the chain of rules that are invoked one after another. The 
expression resulting
+ * from one rule is fed into the next rule and the final result is returned 
from method `analyze`.
+ */
+abstract class Analyzer {
+
+  def rules: Seq[Rule]
+
+  final def analyze(expr: Expression): Expression = {
+    var currentTree = expr
+    for (rule <- rules) {
+      currentTree = rule(currentTree)
+    }
+    currentTree
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/ExtractEquiJoinFields.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/ExtractEquiJoinFields.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/ExtractEquiJoinFields.scala
new file mode 100644
index 0000000..6381a9b
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/ExtractEquiJoinFields.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.analysis
+
+import org.apache.flink.api.table.tree._
+import org.apache.flink.api.common.typeutils.CompositeType
+
+import scala.collection.mutable
+
+/**
+ * Equi-join field extractor for Join Predicates and CoGroup predicates. The 
result is a modified
+ * expression without the equi-join predicates together with indices of the 
join fields
+ * from both the left and right input.
+ */
+object ExtractEquiJoinFields {
+  def apply(leftType: CompositeType[_], rightType: CompositeType[_], 
predicate: Expression) = {
+
+    val joinFieldsLeft = mutable.MutableList[Int]()
+    val joinFieldsRight = mutable.MutableList[Int]()
+
+    val equiJoinExprs = mutable.MutableList[EqualTo]()
+    // First get all `===` expressions that are not below an `Or`
+    predicate.transformPre {
+      case or@Or(_, _) => NopExpression()
+      case eq@EqualTo(le: ResolvedFieldReference, re: ResolvedFieldReference) 
=>
+        if (leftType.hasField(le.name) && rightType.hasField(re.name)) {
+          joinFieldsLeft += leftType.getFieldIndex(le.name)
+          joinFieldsRight += rightType.getFieldIndex(re.name)
+        } else if (leftType.hasField(re.name) && rightType.hasField(le.name)) {
+          joinFieldsLeft += leftType.getFieldIndex(re.name)
+          joinFieldsRight += rightType.getFieldIndex(le.name)
+        } else {
+          // not an equi-join predicate
+        }
+        equiJoinExprs += eq
+        eq
+    }
+
+    // then remove the equi join expressions from the predicate
+    val resultExpr = predicate.transformPost {
+      // For OR, we can eliminate the OR since the equi join
+      // predicate is evaluated before the expression is evaluated
+      case or@Or(NopExpression(), _) => NopExpression()
+      case or@Or(_, NopExpression()) => NopExpression()
+      // For AND we replace it with the other expression, since the
+      // equi join predicate will always be true
+      case and@And(NopExpression(), other) => other
+      case and@And(other, NopExpression()) => other
+      case eq : EqualTo if equiJoinExprs.contains(eq) =>
+        NopExpression()
+    }
+
+    (resultExpr, joinFieldsLeft.toArray, joinFieldsRight.toArray)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/GroupByAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/GroupByAnalyzer.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/GroupByAnalyzer.scala
new file mode 100644
index 0000000..4106a4c
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/GroupByAnalyzer.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.analysis
+
+import org.apache.flink.api.table._
+import org.apache.flink.api.table.tree.{ResolvedFieldReference, Expression}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+import scala.collection.mutable
+
+
+/**
+ * Analyzer for grouping expressions. Only field expressions are allowed as 
grouping expressions.
+ */
+class GroupByAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) extends 
Analyzer {
+
+  def rules = Seq(new ResolveFieldReferences(inputFields), 
CheckGroupExpression)
+
+  object CheckGroupExpression extends Rule {
+
+    def apply(expr: Expression) = {
+      val errors = mutable.MutableList[String]()
+
+      expr match {
+        case f: ResolvedFieldReference => // this is OK
+        case other =>
+          throw new ExpressionException(
+            s"""Invalid grouping expression "$expr". Only field references are 
allowed.""")
+      }
+      expr
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/InsertAutoCasts.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/InsertAutoCasts.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/InsertAutoCasts.scala
new file mode 100644
index 0000000..29e76cd
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/InsertAutoCasts.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.analysis
+
+import org.apache.flink.api.table.tree._
+import org.apache.flink.api.common.typeinfo.{IntegerTypeInfo, BasicTypeInfo}
+
+/**
+ * [[Rule]] that adds casts in arithmetic operations.
+ */
+class InsertAutoCasts extends Rule {
+
+  def apply(expr: Expression) = {
+    val result = expr.transformPost {
+
+      case plus@Plus(o1, o2) =>
+        // Plus is special case since we can cast anything to String for 
String concat
+        if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && 
o2.typeInfo.isBasicType) {
+          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+            o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+            Plus(Cast(o1, o2.typeInfo), o2)
+          } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+            o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+            Plus(o1, Cast(o2, o1.typeInfo))
+          } else if (o1.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) {
+            Plus(o1, Cast(o2, BasicTypeInfo.STRING_TYPE_INFO))
+          } else if (o2.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) {
+            Plus(Cast(o1, BasicTypeInfo.STRING_TYPE_INFO), o2)
+          } else {
+            plus
+          }
+        } else {
+          plus
+        }
+
+      case ba: BinaryExpression if ba.isInstanceOf[BinaryArithmetic] ||
+        ba.isInstanceOf[BinaryComparison] =>
+        val o1 = ba.left
+        val o2 = ba.right
+        if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && 
o2.typeInfo.isBasicType) {
+          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+            o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+            ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2))
+          } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+            o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+            ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo)))
+          } else {
+            ba
+          }
+        } else {
+          ba
+        }
+
+      case ba: BinaryExpression if ba.isInstanceOf[BitwiseBinaryArithmetic] =>
+        val o1 = ba.left
+        val o2 = ba.right
+        if (o1.typeInfo != o2.typeInfo && 
o1.typeInfo.isInstanceOf[IntegerTypeInfo[_]] &&
+          o2.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+            o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+            ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2))
+          } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+            o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+            ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo)))
+          } else {
+            ba
+          }
+        } else {
+          ba
+        }
+    }
+
+    result
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/PredicateAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/PredicateAnalyzer.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/PredicateAnalyzer.scala
new file mode 100644
index 0000000..0c74695
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/PredicateAnalyzer.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.analysis
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+/**
+ * Analyzer for predicates, i.e. filter operations and where clauses of joins.
+ */
+class PredicateAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) 
extends Analyzer {
+  def rules = Seq(
+    new ResolveFieldReferences(inputFields),
+    new InsertAutoCasts,
+    new TypeCheck,
+    new VerifyNoAggregates,
+    new VerifyBoolean)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/ResolveFieldReferences.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/ResolveFieldReferences.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/ResolveFieldReferences.scala
new file mode 100644
index 0000000..3f7cbd2
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/ResolveFieldReferences.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.analysis
+
+import org.apache.flink.api.table.tree.{ResolvedFieldReference,
+UnresolvedFieldReference, Expression}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table._
+
+import scala.collection.mutable
+
+/**
+ * Rule that resolved field references. This rule verifies that field 
references point to existing
+ * fields of the input operation and creates [[ResolvedFieldReference]]s that 
hold the field
+ * [[TypeInformation]] in addition to the field name.
+ */
+class ResolveFieldReferences(inputFields: Seq[(String, TypeInformation[_])]) 
extends Rule {
+
+  def apply(expr: Expression) = {
+    val errors = mutable.MutableList[String]()
+
+    val result = expr.transformPost {
+      case fe@UnresolvedFieldReference(fieldName) =>
+        inputFields.find { _._1 == fieldName } match {
+          case Some((_, tpe)) => ResolvedFieldReference(fieldName, tpe)
+
+          case None =>
+            errors +=
+              s"Field '$fieldName' is not valid for input fields 
${inputFields.mkString(",")}"
+            fe
+        }
+    }
+
+    if (errors.length > 0) {
+      throw new ExpressionException(
+        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
+    }
+
+    result
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/Rule.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/Rule.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/Rule.scala
new file mode 100644
index 0000000..34c5750
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/Rule.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.analysis
+
+import org.apache.flink.api.table.tree.Expression
+
+/**
+ * Base class for a rule that is part of an [[Analyzer]] rule chain. Method 
`rule` gets on
+ * [[Expression]] and must return an expression. The returned [[Expression]] 
can also be
+ * the input [[Expression]]. In an [[Analyzer]] rule chain the result 
[[Expression]] of one
+ * [[Rule]] is fed into the next [[Rule]] in the chain.
+ */
+abstract class Rule {
+  def apply(expr: Expression): Expression
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/SelectionAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/SelectionAnalyzer.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/SelectionAnalyzer.scala
new file mode 100644
index 0000000..70aadb8
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/SelectionAnalyzer.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.analysis
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+/**
+ * This analyzes selection expressions.
+ */
+class SelectionAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) 
extends Analyzer {
+
+  def rules = Seq(
+    new ResolveFieldReferences(inputFields),
+    new VerifyNoNestedAggregates,
+    new InsertAutoCasts,
+    new TypeCheck)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/TypeCheck.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/TypeCheck.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/TypeCheck.scala
new file mode 100644
index 0000000..3304726
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/TypeCheck.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.analysis
+
+import org.apache.flink.api.table.tree.Expression
+import org.apache.flink.api.table.{_}
+
+import scala.collection.mutable
+
+/**
+ * Rule that makes sure we call [[Expression.typeInfo]] on each [[Expression]] 
at least once.
+ * Expressions are expected to perform type verification in this method.
+ */
+class TypeCheck extends Rule {
+
+  def apply(expr: Expression) = {
+    val errors = mutable.MutableList[String]()
+
+    val result = expr.transformPre {
+      case expr: Expression=> {
+        // simply get the typeInfo from the expression. this will perform type 
analysis
+        try {
+          expr.typeInfo
+        } catch {
+          case e: ExpressionException =>
+            errors += e.getMessage
+        }
+        expr
+      }
+    }
+
+    if (errors.length > 0) {
+      throw new ExpressionException(
+        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
+    }
+
+    result
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyBoolean.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyBoolean.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyBoolean.scala
new file mode 100644
index 0000000..d554c44
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyBoolean.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.analysis
+
+import org.apache.flink.api.table.tree.{NopExpression, Expression}
+import org.apache.flink.api.table.{_}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+import scala.collection.mutable
+
+/**
+ * [[Rule]] that verifies that the result type of an [[Expression]] is 
Boolean. This is required
+ * for filter/join predicates.
+ */
+class VerifyBoolean extends Rule {
+
+  def apply(expr: Expression) = {
+    if (!expr.isInstanceOf[NopExpression] && expr.typeInfo != 
BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+      throw new ExpressionException(s"Expression $expr of type 
${expr.typeInfo} is not boolean.")
+    }
+
+    expr
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyNoAggregates.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyNoAggregates.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyNoAggregates.scala
new file mode 100644
index 0000000..6f5201c
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyNoAggregates.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.analysis
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.table.tree.{Aggregation, Expression}
+
+import scala.collection.mutable
+
+/**
+ * Rule that verifies that an expression does not contain aggregate 
operations. Right now, join
+ * predicates and filter predicates cannot contain aggregates.
+ */
+class VerifyNoAggregates extends Rule {
+
+  def apply(expr: Expression) = {
+    val errors = mutable.MutableList[String]()
+
+    val result = expr.transformPre {
+      case agg: Aggregation=> {
+        errors +=
+          s"""Aggregations are not allowed in join/filter predicates."""
+        agg
+      }
+    }
+
+    if (errors.length > 0) {
+      throw new ExpressionException(
+        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
+    }
+
+    result
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyNoNestedAggregates.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyNoNestedAggregates.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyNoNestedAggregates.scala
new file mode 100644
index 0000000..9055355
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyNoNestedAggregates.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.analysis
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.table.tree.{Expression, Aggregation}
+
+import scala.collection.mutable
+
+/**
+ * Rule that verifies that an expression does not contain aggregate operations
+ * as children of aggregate operations.
+ */
+class VerifyNoNestedAggregates extends Rule {
+
+  def apply(expr: Expression) = {
+    val errors = mutable.MutableList[String]()
+
+    val result = expr.transformPre {
+      case agg: Aggregation=> {
+        if (agg.child.exists(_.isInstanceOf[Aggregation])) {
+          errors += s"""Found nested aggregation inside "$agg"."""
+        }
+        agg
+      }
+    }
+
+    if (errors.length > 0) {
+      throw new ExpressionException(
+        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
+    }
+
+    result
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
new file mode 100644
index 0000000..5ec8579
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
@@ -0,0 +1,635 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.codegen
+
+import java.util.concurrent.atomic.AtomicInteger
+import org.apache.flink.api.table.tree._
+import org.apache.flink.api.table.typeinfo.{RenamingProxyTypeInfo, RowTypeInfo}
+import org.apache.flink.api.table.{ExpressionException, tree}
+import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, 
BasicTypeInfo, TypeInformation}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, PojoTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/** Base class for all code generation classes. This provides the 
functionality for generating
+  * code from an [[Expression]] tree. Derived classes must embed this in a 
lambda function
+  * to form an executable code block.
+  *
+  * @param inputs List of input variable names with corresponding 
[[TypeInformation]].
+  * @param nullCheck Whether the generated code should include checks for NULL 
values.
+  * @param cl The ClassLoader that is used to create the Scala reflection 
ToolBox
+  * @tparam R The type of the generated code block. In most cases a lambda 
function such
+  *           as "(IN1, IN2) => OUT".
+  */
+abstract class ExpressionCodeGenerator[R](
+                                           inputs: Seq[(String, 
CompositeType[_])],
+                                           val nullCheck: Boolean = false,
+                                           cl: ClassLoader) {
+  protected val log = 
LoggerFactory.getLogger(classOf[ExpressionCodeGenerator[_]])
+
+  import scala.reflect.runtime.{universe => ru}
+  import scala.reflect.runtime.universe._
+
+  if (cl == null) {
+    throw new IllegalArgumentException("ClassLoader must not be null.")
+  }
+
+  import scala.tools.reflect.ToolBox
+
+  protected val (mirror, toolBox) = ReflectionLock.synchronized {
+    val mirror = runtimeMirror(cl)
+    (mirror, mirror.mkToolBox())
+  }
+
+  // This is to be implemented by subclasses, we have it like this
+  // so that we only call it from here with the Scala Reflection Lock.
+  protected def generateInternal(): R
+
+  final def generate(): R = {
+    ReflectionLock.synchronized {
+      generateInternal()
+    }
+  }
+
+  val cache = mutable.HashMap[Expression, GeneratedExpression]()
+
+  protected def generateExpression(expr: Expression): GeneratedExpression = {
+    // doesn't work yet, because we insert the same code twice and reuse 
variable names
+    //    cache.getOrElseUpdate(expr, generateExpressionInternal(expr))
+    generateExpressionInternal(expr)
+  }
+
+  protected def generateExpressionInternal(expr: Expression): 
GeneratedExpression = {
+    //  protected def generateExpression(expr: Expression): 
GeneratedExpression = {
+    val nullTerm = freshTermName("isNull")
+    val resultTerm = freshTermName("result")
+
+    // For binary predicates that must only be evaluated when both operands 
are non-null.
+    // This will write to nullTerm and resultTerm, so don't use those term 
names
+    // after using this function
+    def generateIfNonNull(left: Expression, right: Expression, resultType: 
TypeInformation[_])
+                         (expr: (TermName, TermName) => Tree): Seq[Tree] = {
+      val leftCode = generateExpression(left)
+      val rightCode = generateExpression(right)
+
+
+      if (nullCheck) {
+        leftCode.code ++ rightCode.code ++ q"""
+        val $nullTerm = ${leftCode.nullTerm}|| ${rightCode.nullTerm}
+        val $resultTerm = if ($nullTerm) {
+          ${defaultPrimitive(resultType)}
+        } else {
+          ${expr(leftCode.resultTerm, rightCode.resultTerm)}
+        }
+        """.children
+      } else {
+        leftCode.code ++ rightCode.code :+ q"""
+        val $resultTerm = ${expr(leftCode.resultTerm, rightCode.resultTerm)}
+        """
+      }
+    }
+
+    val cleanedExpr = expr match {
+      case tree.Naming(namedExpr, _) => namedExpr
+      case _ => expr
+    }
+
+    val code: Seq[Tree] = cleanedExpr match {
+
+      case tree.Literal(null, typeInfo) =>
+        if (nullCheck) {
+          q"""
+            val $nullTerm = true
+            val resultTerm = null
+          """.children
+        } else {
+          Seq( q"""
+            val resultTerm = null
+          """)
+        }
+
+      case tree.Literal(intValue: Int, INT_TYPE_INFO) =>
+        if (nullCheck) {
+          q"""
+            val $nullTerm = false
+            val $resultTerm = $intValue
+          """.children
+        } else {
+          Seq( q"""
+            val $resultTerm = $intValue
+          """)
+        }
+
+      case tree.Literal(longValue: Long, LONG_TYPE_INFO) =>
+        if (nullCheck) {
+          q"""
+            val $nullTerm = false
+            val $resultTerm = $longValue
+          """.children
+        } else {
+          Seq( q"""
+            val $resultTerm = $longValue
+          """)
+        }
+
+
+      case tree.Literal(doubleValue: Double, DOUBLE_TYPE_INFO) =>
+        if (nullCheck) {
+          q"""
+            val $nullTerm = false
+            val $resultTerm = $doubleValue
+          """.children
+        } else {
+          Seq( q"""
+              val $resultTerm = $doubleValue
+          """)
+        }
+
+      case tree.Literal(floatValue: Float, FLOAT_TYPE_INFO) =>
+        if (nullCheck) {
+          q"""
+            val $nullTerm = false
+            val $resultTerm = $floatValue
+          """.children
+        } else {
+          Seq( q"""
+              val $resultTerm = $floatValue
+          """)
+        }
+
+      case tree.Literal(strValue: String, STRING_TYPE_INFO) =>
+        if (nullCheck) {
+          q"""
+            val $nullTerm = false
+            val $resultTerm = $strValue
+          """.children
+        } else {
+          Seq( q"""
+              val $resultTerm = $strValue
+          """)
+        }
+
+      case tree.Literal(boolValue: Boolean, BOOLEAN_TYPE_INFO) =>
+        if (nullCheck) {
+          q"""
+            val $nullTerm = false
+            val $resultTerm = $boolValue
+          """.children
+        } else {
+          Seq( q"""
+              val $resultTerm = $boolValue
+          """)
+        }
+
+      case Substring(str, beginIndex, endIndex) =>
+        val strCode = generateExpression(str)
+        val beginIndexCode = generateExpression(beginIndex)
+        val endIndexCode = generateExpression(endIndex)
+        if (nullCheck) {
+          strCode.code ++ beginIndexCode.code ++ endIndexCode.code ++ q"""
+            val $nullTerm =
+              ${strCode.nullTerm}|| ${beginIndexCode.nullTerm}|| 
${endIndexCode.nullTerm}
+            if ($nullTerm) {
+              ${defaultPrimitive(str.typeInfo)}
+            } else {
+              val $resultTerm = if (${endIndexCode.resultTerm} == 
Int.MaxValue) {
+                 
(${strCode.resultTerm}).substring(${beginIndexCode.resultTerm})
+              } else {
+                (${strCode.resultTerm}).substring(
+                  ${beginIndexCode.resultTerm},
+                  ${endIndexCode.resultTerm})
+              }
+            }
+          """.children
+        } else {
+          strCode.code ++ beginIndexCode.code ++ endIndexCode.code :+ q"""
+            val $resultTerm = if (${endIndexCode.resultTerm} == Int.MaxValue) {
+              (${strCode.resultTerm}).substring(${beginIndexCode.resultTerm})
+            } else {
+              (${strCode.resultTerm}).substring(
+                ${beginIndexCode.resultTerm},
+                ${endIndexCode.resultTerm})
+            }
+          """
+        }
+
+      case tree.Cast(child: Expression, STRING_TYPE_INFO) =>
+        val childGen = generateExpression(child)
+        val castCode = if (nullCheck) {
+          q"""
+            val $nullTerm = ${childGen.nullTerm}
+            val $resultTerm = if ($nullTerm == null) {
+              null
+            } else {
+              ${childGen.resultTerm}.toString
+            }
+          """.children
+        } else {
+          Seq( q"""
+            val $resultTerm = ${childGen.resultTerm}.toString
+          """)
+        }
+        childGen.code ++ castCode
+
+      case tree.Cast(child: Expression, INT_TYPE_INFO) =>
+        val childGen = generateExpression(child)
+        val castCode = if (nullCheck) {
+          q"""
+            val $nullTerm = ${childGen.nullTerm}
+            val $resultTerm = ${childGen.resultTerm}.toInt
+          """.children
+        } else {
+          Seq( q"""
+            val $resultTerm = ${childGen.resultTerm}.toInt
+          """)
+        }
+        childGen.code ++ castCode
+
+      case tree.Cast(child: Expression, LONG_TYPE_INFO) =>
+        val childGen = generateExpression(child)
+        val castCode = if (nullCheck) {
+          q"""
+            val $nullTerm = ${childGen.nullTerm}
+            val $resultTerm = ${childGen.resultTerm}.toLong
+          """.children
+        } else {
+          Seq( q"""
+            val $resultTerm = ${childGen.resultTerm}.toLong
+          """)
+        }
+        childGen.code ++ castCode
+
+      case tree.Cast(child: Expression, FLOAT_TYPE_INFO) =>
+        val childGen = generateExpression(child)
+        val castCode = if (nullCheck) {
+          q"""
+            val $nullTerm = ${childGen.nullTerm}
+            val $resultTerm = ${childGen.resultTerm}.toFloat
+          """.children
+        } else {
+          Seq( q"""
+            val $resultTerm = ${childGen.resultTerm}.toFloat
+          """)
+        }
+        childGen.code ++ castCode
+
+      case tree.Cast(child: Expression, DOUBLE_TYPE_INFO) =>
+        val childGen = generateExpression(child)
+        val castCode = if (nullCheck) {
+          q"""
+            val $nullTerm = ${childGen.nullTerm}
+            val $resultTerm = ${childGen.resultTerm}.toDouble
+          """.children
+        } else {
+          Seq( q"""
+            val $resultTerm = ${childGen.resultTerm}.toDouble
+          """)
+        }
+        childGen.code ++ castCode
+
+      case ResolvedFieldReference(fieldName, fieldTpe: TypeInformation[_]) =>
+        inputs find { i => i._2.hasField(fieldName)} match {
+          case Some((inputName, inputTpe)) =>
+            val fieldCode = getField(newTermName(inputName), inputTpe, 
fieldName, fieldTpe)
+            if (nullCheck) {
+              q"""
+                val $resultTerm = $fieldCode
+                val $nullTerm = $resultTerm == null
+              """.children
+            } else {
+              Seq( q"""
+                val $resultTerm = $fieldCode
+              """)
+            }
+
+          case None => throw new ExpressionException("Could not get accessor 
for " + fieldName
+            + " in inputs " + inputs.mkString(", ") + ".")
+        }
+
+      case GreaterThan(left, right) =>
+        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
+          (leftTerm, rightTerm) => q"$leftTerm > $rightTerm"
+        }
+
+      case GreaterThanOrEqual(left, right) =>
+        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
+          (leftTerm, rightTerm) => q"$leftTerm >= $rightTerm"
+        }
+
+      case LessThan(left, right) =>
+        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
+          (leftTerm, rightTerm) => q"$leftTerm < $rightTerm"
+        }
+
+      case LessThanOrEqual(left, right) =>
+        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
+          (leftTerm, rightTerm) => q"$leftTerm <= $rightTerm"
+        }
+
+      case EqualTo(left, right) =>
+        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
+          (leftTerm, rightTerm) => q"$leftTerm == $rightTerm"
+        }
+
+      case NotEqualTo(left, right) =>
+        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
+          (leftTerm, rightTerm) => q"$leftTerm != $rightTerm"
+        }
+
+      case And(left, right) =>
+        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
+          (leftTerm, rightTerm) => q"$leftTerm && $rightTerm"
+        }
+
+      case Or(left, right) =>
+        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
+          (leftTerm, rightTerm) => q"$leftTerm || $rightTerm"
+        }
+
+      case Plus(left, right) =>
+        generateIfNonNull(left, right, expr.typeInfo) {
+          (leftTerm, rightTerm) => q"$leftTerm + $rightTerm"
+        }
+
+      case Minus(left, right) =>
+        generateIfNonNull(left, right, expr.typeInfo) {
+          (leftTerm, rightTerm) => q"$leftTerm - $rightTerm"
+        }
+
+      case Div(left, right) =>
+        generateIfNonNull(left, right, expr.typeInfo) {
+          (leftTerm, rightTerm) => q"$leftTerm / $rightTerm"
+        }
+
+      case Mul(left, right) =>
+        generateIfNonNull(left, right, expr.typeInfo) {
+          (leftTerm, rightTerm) => q"$leftTerm * $rightTerm"
+        }
+
+      case Mod(left, right) =>
+        generateIfNonNull(left, right, expr.typeInfo) {
+          (leftTerm, rightTerm) => q"$leftTerm % $rightTerm"
+        }
+
+      case UnaryMinus(child) =>
+        val childCode = generateExpression(child)
+        if (nullCheck) {
+          childCode.code ++ q"""
+            val $nullTerm = ${childCode.nullTerm}
+            if ($nullTerm) {
+              ${defaultPrimitive(child.typeInfo)}
+            } else {
+              val $resultTerm = -(${childCode.resultTerm})
+            }
+          """.children
+        } else {
+          childCode.code :+ q"""
+              val $resultTerm = -(${childCode.resultTerm})
+          """
+        }
+
+      case BitwiseAnd(left, right) =>
+        generateIfNonNull(left, right, expr.typeInfo) {
+          (leftTerm, rightTerm) => q"$leftTerm & $rightTerm"
+        }
+
+      case BitwiseOr(left, right) =>
+        generateIfNonNull(left, right, expr.typeInfo) {
+          (leftTerm, rightTerm) => q"$leftTerm | $rightTerm"
+        }
+
+      case BitwiseXor(left, right) =>
+        generateIfNonNull(left, right, expr.typeInfo) {
+          (leftTerm, rightTerm) => q"$leftTerm ^ $rightTerm"
+        }
+
+      case BitwiseNot(child) =>
+        val childCode = generateExpression(child)
+        if (nullCheck) {
+          childCode.code ++ q"""
+            val $nullTerm = ${childCode.nullTerm}
+            if ($nullTerm) {
+              ${defaultPrimitive(child.typeInfo)}
+            } else {
+              val $resultTerm = ~(${childCode.resultTerm})
+            }
+          """.children
+        } else {
+          childCode.code :+ q"""
+              val $resultTerm = ~(${childCode.resultTerm})
+          """
+        }
+
+      case Not(child) =>
+        val childCode = generateExpression(child)
+        if (nullCheck) {
+          childCode.code ++ q"""
+            val $nullTerm = ${childCode.nullTerm}
+            if ($nullTerm) {
+              ${defaultPrimitive(child.typeInfo)}
+            } else {
+              val $resultTerm = !(${childCode.resultTerm})
+            }
+          """.children
+        } else {
+          childCode.code :+ q"""
+              val $resultTerm = !(${childCode.resultTerm})
+          """
+        }
+
+      case IsNull(child) =>
+        val childCode = generateExpression(child)
+        if (nullCheck) {
+          childCode.code ++ q"""
+            val $nullTerm = ${childCode.nullTerm}
+            if ($nullTerm) {
+              ${defaultPrimitive(child.typeInfo)}
+            } else {
+              val $resultTerm = (${childCode.resultTerm}) == null
+            }
+          """.children
+        } else {
+          childCode.code :+ q"""
+              val $resultTerm = (${childCode.resultTerm}) == null
+          """
+        }
+
+      case IsNotNull(child) =>
+        val childCode = generateExpression(child)
+        if (nullCheck) {
+          childCode.code ++ q"""
+            val $nullTerm = ${childCode.nullTerm}
+            if ($nullTerm) {
+              ${defaultPrimitive(child.typeInfo)}
+            } else {
+              val $resultTerm = (${childCode.resultTerm}) != null
+            }
+          """.children
+        } else {
+          childCode.code :+ q"""
+              val $resultTerm = (${childCode.resultTerm}) != null
+          """
+        }
+
+      case Abs(child) =>
+        val childCode = generateExpression(child)
+        if (nullCheck) {
+          childCode.code ++ q"""
+            val $nullTerm = ${childCode.nullTerm}
+            if ($nullTerm) {
+              ${defaultPrimitive(child.typeInfo)}
+            } else {
+              val $resultTerm = Math.abs(${childCode.resultTerm})
+            }
+          """.children
+        } else {
+          childCode.code :+ q"""
+              val $resultTerm = Math.abs(${childCode.resultTerm})
+          """
+        }
+
+      case _ => throw new ExpressionException("Could not generate code for 
expression " + expr)
+    }
+
+    GeneratedExpression(code, resultTerm, nullTerm)
+  }
+
+  case class GeneratedExpression(code: Seq[Tree], resultTerm: TermName, 
nullTerm: TermName)
+
+  // We don't have c.freshName
+  // According to http://docs.scala-lang.org/overviews/quasiquotes/hygiene.html
+  // it's coming for 2.11. We can't wait that long...
+  def freshTermName(name: String): TermName = {
+    newTermName(s"$name$$${freshNameCounter.getAndIncrement}")
+  }
+
+  val freshNameCounter = new AtomicInteger
+
+  protected def getField(
+                          inputTerm: TermName,
+                          inputType: CompositeType[_],
+                          fieldName: String,
+                          fieldType: TypeInformation[_]): Tree = {
+    val accessor = fieldAccessorFor(inputType, fieldName)
+    accessor match {
+      case ObjectFieldAccessor(fieldName) =>
+        val fieldTerm = newTermName(fieldName)
+        
q"$inputTerm.$fieldTerm.asInstanceOf[${typeTermForTypeInfo(fieldType)}]"
+
+      case ObjectMethodAccessor(methodName) =>
+        val methodTerm = newTermName(methodName)
+        
q"$inputTerm.$methodTerm().asInstanceOf[${typeTermForTypeInfo(fieldType)}]"
+
+      case ProductAccessor(i) =>
+        
q"$inputTerm.productElement($i).asInstanceOf[${typeTermForTypeInfo(fieldType)}]"
+
+    }
+  }
+
+  sealed abstract class FieldAccessor
+
+  case class ObjectFieldAccessor(fieldName: String) extends FieldAccessor
+
+  case class ObjectMethodAccessor(methodName: String) extends FieldAccessor
+
+  case class ProductAccessor(i: Int) extends FieldAccessor
+
+  def fieldAccessorFor(elementType: CompositeType[_], fieldName: String): 
FieldAccessor = {
+    elementType match {
+      case ri: RowTypeInfo =>
+        ProductAccessor(elementType.getFieldIndex(fieldName))
+
+      case cc: CaseClassTypeInfo[_] =>
+        ObjectFieldAccessor(fieldName)
+
+      case javaTup: TupleTypeInfo[_] =>
+        ObjectFieldAccessor(fieldName)
+
+      case pj: PojoTypeInfo[_] =>
+        ObjectFieldAccessor(fieldName)
+
+      case proxy: RenamingProxyTypeInfo[_] =>
+        val underlying = proxy.getUnderlyingType
+        val fieldIndex = proxy.getFieldIndex(fieldName)
+        fieldAccessorFor(underlying, underlying.getFieldNames()(fieldIndex))
+    }
+  }
+
+  protected def defaultPrimitive(tpe: TypeInformation[_]) = tpe match {
+    case BasicTypeInfo.INT_TYPE_INFO => ru.Literal(Constant(-1))
+    case BasicTypeInfo.LONG_TYPE_INFO => ru.Literal(Constant(1L))
+    case BasicTypeInfo.SHORT_TYPE_INFO => ru.Literal(Constant(-1.toShort))
+    case BasicTypeInfo.BYTE_TYPE_INFO => ru.Literal(Constant(-1.toByte))
+    case BasicTypeInfo.FLOAT_TYPE_INFO => ru.Literal(Constant(-1.0.toFloat))
+    case BasicTypeInfo.DOUBLE_TYPE_INFO => ru.Literal(Constant(-1.toDouble))
+    case BasicTypeInfo.BOOLEAN_TYPE_INFO => ru.Literal(Constant(false))
+    case BasicTypeInfo.STRING_TYPE_INFO => ru.Literal(Constant("<empty>"))
+    case BasicTypeInfo.CHAR_TYPE_INFO => ru.Literal(Constant('\0'))
+    case _ => ru.Literal(Constant(null))
+  }
+
+  protected def typeTermForTypeInfo(typeInfo: TypeInformation[_]): Tree = {
+    val tpe = typeForTypeInfo(typeInfo)
+    tq"$tpe"
+  }
+
+  // We need two separate methods here because typeForTypeInfo is recursive 
when generating
+  // the type for a type with generic parameters.
+  protected def typeForTypeInfo(tpe: TypeInformation[_]): Type = tpe match {
+
+    // From PrimitiveArrayTypeInfo we would get class "int[]", scala 
reflections
+    // does not seem to like this, so we manually give the correct type here.
+    case PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO => 
typeOf[Array[Int]]
+    case PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO => 
typeOf[Array[Long]]
+    case PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO => 
typeOf[Array[Short]]
+    case PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO => 
typeOf[Array[Byte]]
+    case PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => 
typeOf[Array[Float]]
+    case PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => 
typeOf[Array[Double]]
+    case PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => 
typeOf[Array[Boolean]]
+    case PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO => 
typeOf[Array[Char]]
+
+    case _ =>
+      val clazz = mirror.staticClass(tpe.getTypeClass.getCanonicalName)
+
+      clazz.selfType.erasure match {
+        case ExistentialType(_, underlying) => underlying
+
+        case tpe@TypeRef(prefix, sym, Nil) =>
+          // Non-generic type, just return the type
+          tpe
+
+        case TypeRef(prefix, sym, emptyParams) =>
+          val genericTypeInfos = tpe.getGenericParameters.asScala
+          if (emptyParams.length != genericTypeInfos.length) {
+            throw new RuntimeException("Number of type parameters does not 
match.")
+          }
+          val typeParams = genericTypeInfos.map(typeForTypeInfo)
+          // TODO: remove, added only for migration of the line below, as 
suggested by the compiler
+          import compat._
+          TypeRef(prefix, sym, typeParams.toList)
+      }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryPredicate.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryPredicate.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryPredicate.scala
new file mode 100644
index 0000000..9c9faa4
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryPredicate.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.codegen
+
+import org.apache.flink.api.table.tree.Expression
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.slf4j.LoggerFactory
+
+/**
+ * Code generator for binary predicates, i.e. a Join or CoGroup Predicate.
+ */
+class GenerateBinaryPredicate[L, R](
+    leftType: CompositeType[L],
+    rightType: CompositeType[R],
+    predicate: Expression,
+    cl: ClassLoader)
+  extends ExpressionCodeGenerator[(L, R) => Boolean](
+    Seq(("input0", leftType), ("input1", rightType)),
+    cl = cl) {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  import scala.reflect.runtime.{universe => ru}
+  import scala.reflect.runtime.universe._
+
+  override protected def generateInternal(): ((L, R) => Boolean) = {
+    val pred = generateExpression(predicate)
+
+    val in0 = newTermName("input0")
+    val in1 = newTermName("input1")
+
+    val leftTpe = typeTermForTypeInfo(leftType)
+    val rightTpe = typeTermForTypeInfo(rightType)
+
+    val code = if (nullCheck) {
+      q"""
+        ($in0: $leftTpe, $in1: $rightTpe) => {
+          ..${pred.code}
+          if (${pred.nullTerm}) {
+            false
+          } else {
+            ${pred.resultTerm}
+          }
+        }
+      """
+    } else {
+      q"""
+        ($in0: $leftTpe, $in1: $rightTpe) => {
+          ..${pred.code}
+          ${pred.resultTerm}
+        }
+      """
+    }
+
+    LOG.debug(s"""Generated binary predicate "$predicate":\n$code""")
+    toolBox.eval(code).asInstanceOf[(L, R) => Boolean]
+  }
+}

Reply via email to