http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryResultAssembler.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryResultAssembler.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryResultAssembler.scala
new file mode 100644
index 0000000..253cac9
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryResultAssembler.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.codegen
+
+import org.apache.flink.api.table.tree.Expression
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.slf4j.LoggerFactory
+
+/**
+ * Code generator for assembling the result of a binary operation.
+ */
+class GenerateBinaryResultAssembler[L, R, O](
+    leftTypeInfo: CompositeType[L],
+    rightTypeInfo: CompositeType[R],
+    resultTypeInfo: CompositeType[O],
+    outputFields: Seq[Expression],
+    cl: ClassLoader)
+  extends GenerateResultAssembler[(L, R, O) => O](
+    Seq(("input0", leftTypeInfo), ("input1", rightTypeInfo)),
+    cl = cl) {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  import scala.reflect.runtime.universe._
+
+
+  override protected def generateInternal(): ((L, R, O) => O) = {
+
+    val leftType = typeTermForTypeInfo(leftTypeInfo)
+    val rightType = typeTermForTypeInfo(rightTypeInfo)
+    val resultType = typeTermForTypeInfo(resultTypeInfo)
+
+    val resultCode = createResult(resultTypeInfo, outputFields)
+
+    val code: Tree =
+      q"""
+        (input0: $leftType, input1: $rightType, out: $resultType) => {
+          ..$resultCode
+        }
+      """
+
+    LOG.debug(s"Generated binary result-assembler:\n$code")
+    toolBox.eval(code).asInstanceOf[(L, R, O) => O]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala
new file mode 100644
index 0000000..42f256f
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.codegen
+
+import org.apache.flink.api.table.tree.Expression
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, PojoTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+
+/**
+ * Base class for unary and binary result assembler code generators.
+ */
+abstract class GenerateResultAssembler[R](
+    inputs: Seq[(String, CompositeType[_])],
+    cl: ClassLoader)
+  extends ExpressionCodeGenerator[R](inputs, cl = cl) {
+  import scala.reflect.runtime.{universe => ru}
+  import scala.reflect.runtime.universe._
+
+  def createResult[T](
+      resultTypeInfo: CompositeType[T],
+      outputFields: Seq[Expression]): Tree = {
+
+    val resultType = typeTermForTypeInfo(resultTypeInfo)
+
+    val fieldsCode = outputFields.map(generateExpression)
+
+    val block = resultTypeInfo match {
+      case ri: RowTypeInfo =>
+        val resultSetters: Seq[Tree] = fieldsCode.zipWithIndex map {
+          case (fieldCode, i) =>
+            q"""
+              out.setField($i, { ..${fieldCode.code}; ${fieldCode.resultTerm} 
})
+            """
+        }
+
+        q"""
+          ..$resultSetters
+          out
+        """
+
+      case pj: PojoTypeInfo[_] =>
+        val resultSetters: Seq[Tree] = fieldsCode.zip(outputFields) map {
+        case (fieldCode, expr) =>
+          val fieldName = newTermName(expr.name)
+          q"""
+              out.$fieldName = { ..${fieldCode.code}; ${fieldCode.resultTerm} }
+            """
+        }
+
+        q"""
+          ..$resultSetters
+          out
+        """
+
+      case tup: TupleTypeInfo[_] =>
+        val resultSetters: Seq[Tree] = fieldsCode.zip(outputFields) map {
+          case (fieldCode, expr) =>
+            val fieldName = newTermName(expr.name)
+            q"""
+              out.$fieldName = { ..${fieldCode.code}; ${fieldCode.resultTerm} }
+            """
+        }
+
+        q"""
+          ..$resultSetters
+          out
+        """
+
+      case cc: CaseClassTypeInfo[_] =>
+        val resultFields: Seq[Tree] = fieldsCode map {
+          fieldCode =>
+            q"{ ..${fieldCode.code}; ${fieldCode.resultTerm}}"
+        }
+        q"""
+          new $resultType(..$resultFields)
+        """
+    }
+
+    block
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryPredicate.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryPredicate.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryPredicate.scala
new file mode 100644
index 0000000..32af2a9
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryPredicate.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.codegen
+
+import org.apache.flink.api.table.tree.Expression
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.slf4j.LoggerFactory
+
+/**
+ * Code generator for a unary predicate, i.e. a Filter.
+ */
+class GenerateUnaryPredicate[T](
+    inputType: CompositeType[T],
+    predicate: Expression,
+    cl: ClassLoader) extends ExpressionCodeGenerator[T => Boolean](
+      Seq(("input0", inputType)),
+      cl = cl) {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  import scala.reflect.runtime.{universe => ru}
+  import scala.reflect.runtime.universe._
+
+  override protected def generateInternal(): (T => Boolean) = {
+    val pred = generateExpression(predicate)
+
+    val tpe = typeTermForTypeInfo(inputType)
+
+    val code = if (nullCheck) {
+      q"""
+        (input0: $tpe) => {
+          ..${pred.code}
+          if (${pred.nullTerm}) {
+            false
+          } else {
+            ${pred.resultTerm}
+          }
+        }
+      """
+    } else {
+      q"""
+        (input0: $tpe) => {
+          ..${pred.code}
+          ${pred.resultTerm}
+        }
+      """
+    }
+
+    LOG.debug(s"""Generated unary predicate "$predicate":\n$code""")
+    toolBox.eval(code).asInstanceOf[(T) => Boolean]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryResultAssembler.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryResultAssembler.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryResultAssembler.scala
new file mode 100644
index 0000000..38d7109
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryResultAssembler.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.codegen
+
+import org.apache.flink.api.table.tree.Expression
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.slf4j.LoggerFactory
+
+/**
+ * Code generator for assembling the result of a unary operation.
+ */
+class GenerateUnaryResultAssembler[I, O](
+    inputTypeInfo: CompositeType[I],
+    resultTypeInfo: CompositeType[O],
+    outputFields: Seq[Expression],
+    cl: ClassLoader)
+  extends GenerateResultAssembler[(I, O) => O](
+    Seq(("input0", inputTypeInfo)),
+    cl = cl) {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  import scala.reflect.runtime.universe._
+
+  override protected def generateInternal(): ((I, O) => O) = {
+
+    val inputType = typeTermForTypeInfo(inputTypeInfo)
+    val resultType = typeTermForTypeInfo(resultTypeInfo)
+
+    val resultCode = createResult(resultTypeInfo, outputFields)
+
+    val code: Tree =
+      q"""
+        (input0: $inputType, out: $resultType) => {
+          ..$resultCode
+        }
+      """
+
+    LOG.debug(s"Generated unary result-assembler:\n${show(code)}")
+    toolBox.eval(code).asInstanceOf[(I, O) => O]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala
new file mode 100644
index 0000000..b69ac1c
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+package object codegen {
+  // Used in ExpressionCodeGenerator because Scala 2.10 reflection is not 
thread safe. We might
+  // have several parallel expression operators in one TaskManager, therefore 
we need to guard
+  // these operations.
+  object ReflectionLock
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/ExpandAggregations.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/ExpandAggregations.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/ExpandAggregations.scala
new file mode 100644
index 0000000..894dd22
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/ExpandAggregations.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.operations
+
+import org.apache.flink.api.table.analysis.SelectionAnalyzer
+import org.apache.flink.api.table.tree._
+import org.apache.flink.api.java.aggregation.Aggregations
+
+import scala.collection.mutable
+
+/**
+ * This is used to expand a [[Select]] that contains aggregations. If it is 
called on a [[Select]]
+ * without aggregations it is simply returned.
+ *
+ * This select:
+ * {{{
+ *   in.select('key, 'value.avg)
+ * }}}
+ *
+ * is transformed to this expansion:
+ * {{{
+ *   in
+ *     .select('key, 'value, Literal(1) as 'intermediate.1)
+ *     .aggregate('value.sum, 'intermediate.1.sum)
+ *     .select('key, 'value / 'intermediate.1)
+ * }}}
+ *
+ * If the input of the [[Select]] is a [[GroupBy]] this is preserved before 
the aggregation.
+ */
+object ExpandAggregations {
+  def apply(select: Select): Operation = select match {
+    case Select(input, selection) =>
+
+      val aggregations = mutable.HashMap[(Expression, Aggregations), String]()
+      val intermediateFields = mutable.HashSet[Expression]()
+      val aggregationIntermediates = mutable.HashMap[Aggregation, 
Seq[Expression]]()
+
+      var intermediateCount = 0
+      selection foreach {  f =>
+        f.transformPre {
+          case agg: Aggregation =>
+            val intermediateReferences = 
agg.getIntermediateFields.zip(agg.getAggregations) map {
+              case (expr, basicAgg) =>
+                aggregations.get((expr, basicAgg)) match {
+                  case Some(intermediateName) =>
+                    ResolvedFieldReference(intermediateName, expr.typeInfo)
+                  case None =>
+                    intermediateCount = intermediateCount + 1
+                    val intermediateName = s"intermediate.$intermediateCount"
+                    intermediateFields += Naming(expr, intermediateName)
+                    aggregations((expr, basicAgg)) = intermediateName
+                    ResolvedFieldReference(intermediateName, expr.typeInfo)
+                }
+            }
+
+            aggregationIntermediates(agg) = intermediateReferences
+            // Return a NOP so that we don't add the children of the 
aggregation
+            // to intermediate fields. We already added the necessary fields 
to the list
+            // of intermediate fields.
+            NopExpression()
+
+          case fa: ResolvedFieldReference =>
+            if (!fa.name.startsWith("intermediate")) {
+              intermediateFields += Naming(fa, fa.name)
+            }
+            fa
+        }
+      }
+
+      if (aggregations.isEmpty) {
+        // no aggregations, just return
+        return select
+      }
+
+      // also add the grouping keys to the set of intermediate fields, because 
we use a Set,
+      // they are only added when not already present
+      input match {
+        case GroupBy(_, groupingFields) =>
+          groupingFields foreach {
+            case fa: ResolvedFieldReference =>
+              intermediateFields += Naming(fa, fa.name)
+          }
+        case _ => // Nothing to add
+      }
+
+      val basicAggregations = aggregations.map {
+        case ((expr, basicAgg), fieldName) =>
+          (fieldName, basicAgg)
+      }
+
+      val finalFields = selection.map {  f =>
+        f.transformPre {
+          case agg: Aggregation =>
+            val intermediates = aggregationIntermediates(agg)
+            agg.getFinalField(intermediates)
+        }
+      }
+
+      val intermediateAnalyzer = new SelectionAnalyzer(input.outputFields)
+      val analyzedIntermediates = 
intermediateFields.toSeq.map(intermediateAnalyzer.analyze)
+
+      val finalAnalyzer =
+        new SelectionAnalyzer(analyzedIntermediates.map(e => (e.name, 
e.typeInfo)))
+      val analyzedFinals = finalFields.map(finalAnalyzer.analyze)
+
+      val result = input match {
+        case GroupBy(groupByInput, groupingFields) =>
+          Select(
+            Aggregate(
+              GroupBy(
+                Select(groupByInput, analyzedIntermediates),
+                groupingFields),
+              basicAggregations.toSeq),
+            analyzedFinals)
+
+        case _ =>
+          Select(
+            Aggregate(
+              Select(input, analyzedIntermediates),
+              basicAggregations.toSeq),
+            analyzedFinals)
+
+      }
+
+      result
+
+    case _ => select
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/TableTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/TableTranslator.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/TableTranslator.scala
new file mode 100644
index 0000000..194edda
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/TableTranslator.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.operations
+
+import java.lang.reflect.Modifier
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.parser.ExpressionParser
+import org.apache.flink.api.table.tree.{Expression, Naming, 
ResolvedFieldReference, UnresolvedFieldReference}
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+import org.apache.flink.api.table.{ExpressionException, Table}
+
+import scala.language.reflectiveCalls
+
+/**
+ * When an [[org.apache.flink.api.table.Table]] is created a 
[[TableTranslator]] corresponding to
+ * the underlying representation (either 
[[org.apache.flink.api.scala.DataSet]] or
+ * [[org.apache.flink.streaming.api.scala.DataStream]] is created. This way, 
the Table API can be
+ * completely agnostic while translation back to the correct API is handled by 
the API specific
+ * [[TableTranslator]].
+ */
+abstract class TableTranslator {
+
+  type Representation[A] <: { def getType(): TypeInformation[A] }
+
+  /**
+   * Translates the given Table API [[Operation]] back to the underlying 
representation, i.e,
+   * a DataSet or a DataStream.
+   */
+  def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): 
Representation[A]
+
+  /**
+   * Creates a [[Table]] from a DataSet or a DataStream (the underlying 
representation).
+   */
+  def createTable[A](
+      repr: Representation[A],
+      inputType: CompositeType[A],
+      expressions: Array[Expression],
+      resultFields: Seq[(String, TypeInformation[_])]): Table[this.type]
+
+  /**
+   * Creates a [[Table]] from the given DataSet or DataStream.
+   */
+  def createTable[A](repr: Representation[A]): Table[this.type] = {
+
+    val fields = repr.getType() match {
+      case c: CompositeType[A] => c.getFieldNames.map(UnresolvedFieldReference)
+
+      case tpe => Array() // createTable will throw an exception for this later
+    }
+    createTable(
+      repr,
+      fields.toArray.asInstanceOf[Array[Expression]],
+      checkDeterministicFields = false)
+  }
+
+  /**
+   * Creates a [[Table]] from the given DataSet or DataStream while only 
taking those
+   * fields mentioned in the field expression.
+   */
+  def createTable[A](repr: Representation[A], expression: String): 
Table[this.type] = {
+
+    val fields = ExpressionParser.parseExpressionList(expression)
+
+    createTable(repr, fields.toArray, checkDeterministicFields = true)
+  }
+
+  /**
+   * Creates a [[Table]] from the given DataSet or DataStream while only 
taking those
+   * fields mentioned in the fields parameter.
+   *
+   * When checkDeterministicFields is true check whether the fields of the 
underlying
+   * [[TypeInformation]] have a deterministic ordering. This is only the case 
for Tuples
+   * and Case classes. For a POJO, the field order is not obvious, this can 
lead to problems
+   * when a user renames fields and assumes a certain ordering.
+   */
+  def createTable[A](
+      repr: Representation[A],
+      fields: Array[Expression],
+      checkDeterministicFields: Boolean = true): Table[this.type] = {
+
+    // shortcut for DataSet[Row] or DataStream[Row]
+    repr.getType() match {
+      case rowTypeInfo: RowTypeInfo =>
+        val expressions = rowTypeInfo.getFieldNames map {
+          name => (name, rowTypeInfo.getTypeAt(name))
+        }
+        new Table(
+          Root(repr, expressions), this)
+
+      case c: CompositeType[A] => // us ok
+
+      case tpe => throw new ExpressionException("Only DataSets or DataStreams 
of composite type" +
+        "can be transformed to a Table. These would be tuples, case classes 
and " +
+        "POJOs. Type is: " + tpe)
+
+    }
+
+    val clazz = repr.getType().getTypeClass
+    if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
+      throw new ExpressionException("Cannot create Table from DataSet or 
DataStream of type " +
+        clazz.getName + ". Only top-level classes or static members classes " +
+        " are supported.")
+    }
+
+    val inputType = repr.getType().asInstanceOf[CompositeType[A]]
+
+    if (!inputType.hasDeterministicFieldOrder && checkDeterministicFields) {
+      throw new ExpressionException(s"You cannot rename fields upon Table 
creaton: " +
+        s"Field order of input type $inputType is not deterministic." )
+    }
+
+    if (fields.length != inputType.getFieldNames.length) {
+      throw new ExpressionException("Number of selected fields: '" + 
fields.mkString(",") +
+        "' and number of fields in input type " + inputType + " do not match.")
+    }
+
+    val newFieldNames = fields map {
+      case UnresolvedFieldReference(name) => name
+      case e =>
+        throw new ExpressionException("Only field references allowed in 'as' 
operation, " +
+          " offending expression: " + e)
+    }
+
+    if (newFieldNames.toSet.size != newFieldNames.size) {
+      throw new ExpressionException(s"Ambiguous field names in 
${fields.mkString(", ")}")
+    }
+
+    val resultFields: Seq[(String, TypeInformation[_])] = 
newFieldNames.zipWithIndex map {
+      case (name, index) => (name, inputType.getTypeAt(index))
+    }
+
+    val inputFields = inputType.getFieldNames
+    val fieldMappings = inputFields.zip(resultFields)
+    val expressions: Array[Expression] = fieldMappings map {
+      case (oldName, (newName, tpe)) => Naming(ResolvedFieldReference(oldName, 
tpe), newName)
+    }
+
+    createTable(repr, inputType, expressions, resultFields)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/operations.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/operations.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/operations.scala
new file mode 100644
index 0000000..5b80570
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/operations.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.operations
+
+import org.apache.flink.api.table.tree.Expression
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.aggregation.Aggregations
+
+/**
+ * Base class for all Table API operations.
+ */
+sealed abstract class Operation {
+  def outputFields: Seq[(String, TypeInformation[_])]
+}
+
+/**
+ * Operation that transforms a [[org.apache.flink.api.scala.DataSet]] or
+ * [[org.apache.flink.streaming.api.scala.DataStream]] into a 
[[org.apache.flink.api.table.Table]].
+ */
+case class Root[T](input: T, outputFields: Seq[(String, TypeInformation[_])]) 
extends Operation
+
+/**
+ * Operation that joins two [[org.apache.flink.api.table.Table]]s. A "filter" 
and a "select"
+ * should be applied after a join operation.
+ */
+case class Join(left: Operation, right: Operation) extends Operation {
+  def outputFields = left.outputFields ++ right.outputFields
+
+  override def toString = s"Join($left, $right)"
+}
+
+/**
+ * Operation that filters out elements that do not match the predicate 
expression.
+ */
+case class Filter(input: Operation, predicate: Expression) extends Operation {
+  def outputFields = input.outputFields
+
+  override def toString = s"Filter($input, $predicate)"
+}
+
+/**
+ * Selection expression. Similar to an SQL SELECT statement. The expressions 
can select fields
+ * and perform arithmetic or logic operations. The expressions can also 
perform aggregates
+ * on fields.
+ */
+case class Select(input: Operation, selection: Seq[Expression]) extends 
Operation {
+  def outputFields = selection.toSeq map { e => (e.name, e.typeInfo) }
+
+  override def toString = s"Select($input, ${selection.mkString(",")})"
+}
+
+/**
+ * Operation that gives new names to fields. Use this to disambiguate fields 
before a join
+ * operation.
+ */
+case class As(input: Operation, names: Seq[String]) extends Operation {
+  val outputFields = input.outputFields.zip(names) map {
+    case ((_, tpe), newName) => (newName, tpe)
+  }
+
+  override def toString = s"As($input, ${names.mkString(",")})"
+}
+
+/**
+ * Grouping operation. Keys are specified using field references. A group by 
operation os only
+ * useful when performing a select with aggregates afterwards.
+ * @param input
+ * @param fields
+ */
+case class GroupBy(input: Operation, fields: Seq[Expression]) extends 
Operation {
+  def outputFields = input.outputFields
+
+  override def toString = s"GroupBy($input, ${fields.mkString(",")})"
+}
+
+/**
+ * Internal operation. Selection operations containing aggregates are expanded 
to an [[Aggregate]]
+ * and a simple [[Select]].
+ */
+case class Aggregate(
+    input: Operation,
+    aggregations: Seq[(String, Aggregations)]) extends Operation {
+  def outputFields = input.outputFields
+
+  override def toString = s"Aggregate($input, ${aggregations.mkString(",")})"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/package.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/package.scala
new file mode 100644
index 0000000..0f75424
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+/**
+ * The operations in this package are created by calling methods on [[Table]] 
they
+ * should not be manually created by users of the API.
+ */
+package object operations

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
new file mode 100644
index 0000000..bdcb22c
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api
+
+/**
+ * == Table API ==
+ *
+ * This package contains the generic part of the Table API. It can be used 
with Flink Streaming
+ * and Flink Batch. From Scala as well as from Java.
+ *
+ * When using the Table API, as user creates a 
[[org.apache.flink.api.table.Table]] from
+ * a DataSet or DataStream. On this relational operations can be performed. A 
table can also
+ * be converted back to a DataSet or DataStream.
+ *
+ * Packages [[org.apache.flink.api.scala.table]] and 
[[org.apache.flink.api.java.table]] contain
+ * the language specific part of the API. Refer to these packages for 
documentation on how
+ * the Table API can be used in Java and Scala.
+ */
+package object table

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
new file mode 100644
index 0000000..a0bc2b9
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.parser
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.table.operations.As
+import org.apache.flink.api.table.tree._
+
+import scala.util.parsing.combinator.{PackratParsers, JavaTokenParsers}
+
+/**
+ * Parser for expressions inside a String. This parses exactly the same 
expressions that
+ * would be accepted by the Scala Expression DSL.
+ *
+ * See 
[[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and
+ * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]] for 
the constructs
+ * available in the Scala Expression DSL. This parser must be kept in sync 
with the Scala DSL
+ * lazy valined in the above files.
+ */
+object ExpressionParser extends JavaTokenParsers with PackratParsers {
+
+  // Literals
+
+  lazy val numberLiteral: PackratParser[Expression] =
+    ((wholeNumber <~ ("L" | "l")) | floatingPointNumber | decimalNumber | 
wholeNumber) ^^ {
+      str =>
+        if (str.endsWith("L") || str.endsWith("l")) {
+          Literal(str.toLong)
+        } else if (str.matches("""-?\d+""")) {
+          Literal(str.toInt)
+        } else if (str.endsWith("f") | str.endsWith("F")) {
+          Literal(str.toFloat)
+        } else {
+          Literal(str.toDouble)
+        }
+    }
+
+  lazy val singleQuoteStringLiteral: Parser[Expression] =
+    ("'" + """([^'\p{Cntrl}\\]|\\[\\'"bfnrt]|\\u[a-fA-F0-9]{4})*""" + "'").r 
^^ {
+      str => Literal(str.substring(1, str.length - 1))
+    }
+
+  lazy val stringLiteralFlink: PackratParser[Expression] = super.stringLiteral 
^^ {
+    str => Literal(str.substring(1, str.length - 1))
+  }
+
+  lazy val boolLiteral: PackratParser[Expression] = ("true" | "false") ^^ {
+    str => Literal(str.toBoolean)
+  }
+
+  lazy val literalExpr: PackratParser[Expression] =
+    numberLiteral |
+      stringLiteralFlink | singleQuoteStringLiteral |
+      boolLiteral
+
+  lazy val fieldReference: PackratParser[Expression] = ident ^^ {
+    case sym => UnresolvedFieldReference(sym)
+  }
+
+  lazy val atom: PackratParser[Expression] =
+    ( "(" ~> expression <~ ")" ) | literalExpr | fieldReference
+
+  // suffix ops
+  lazy val isNull: PackratParser[Expression] = atom <~ ".isNull" ^^ { e => 
IsNull(e) }
+  lazy val isNotNull: PackratParser[Expression] = atom <~ ".isNotNull" ^^ { e 
=> IsNotNull(e) }
+
+  lazy val abs: PackratParser[Expression] = atom <~ ".abs" ^^ { e => Abs(e) }
+
+  lazy val sum: PackratParser[Expression] = atom <~ ".sum" ^^ { e => Sum(e) }
+  lazy val min: PackratParser[Expression] = atom <~ ".min" ^^ { e => Min(e) }
+  lazy val max: PackratParser[Expression] = atom <~ ".max" ^^ { e => Max(e) }
+  lazy val count: PackratParser[Expression] = atom <~ ".count" ^^ { e => 
Count(e) }
+  lazy val avg: PackratParser[Expression] = atom <~ ".avg" ^^ { e => Avg(e) }
+
+  lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ 
")" ^^ {
+    case e ~ _ ~ as ~ _ => Naming(e, as.name)
+  }
+
+  lazy val substring: PackratParser[Expression] =
+    atom ~ ".substring(" ~ expression ~ "," ~ expression ~ ")" ^^ {
+      case e ~ _ ~ from ~ _ ~ to ~ _ => Substring(e, from, to)
+
+    }
+
+  lazy val substringWithoutEnd: PackratParser[Expression] =
+    atom ~ ".substring(" ~ expression ~ ")" ^^ {
+      case e ~ _ ~ from ~ _ => Substring(e, from, Literal(Integer.MAX_VALUE))
+
+    }
+
+  lazy val suffix =
+    isNull | isNotNull |
+      abs | sum | min | max | count | avg |
+      substring | substringWithoutEnd | atom
+
+
+  // unary ops
+
+  lazy val unaryNot: PackratParser[Expression] = "!" ~> suffix ^^ { e => 
Not(e) }
+
+  lazy val unaryMinus: PackratParser[Expression] = "-" ~> suffix ^^ { e => 
UnaryMinus(e) }
+
+  lazy val unaryBitwiseNot: PackratParser[Expression] = "~" ~> suffix ^^ { e 
=> BitwiseNot(e) }
+
+  lazy val unary = unaryNot | unaryMinus | unaryBitwiseNot | suffix
+
+  // binary bitwise opts
+
+  lazy val binaryBitwise = unary * (
+    "&" ^^^ { (a:Expression, b:Expression) => BitwiseAnd(a,b) } |
+      "|" ^^^ { (a:Expression, b:Expression) => BitwiseOr(a,b) } |
+      "^" ^^^ { (a:Expression, b:Expression) => BitwiseXor(a,b) } )
+
+  // arithmetic
+
+  lazy val product = binaryBitwise * (
+    "*" ^^^ { (a:Expression, b:Expression) => Mul(a,b) } |
+      "/" ^^^ { (a:Expression, b:Expression) => Div(a,b) } |
+      "%" ^^^ { (a:Expression, b:Expression) => Mod(a,b) } )
+
+  lazy val term = product * (
+    "+" ^^^ { (a:Expression, b:Expression) => Plus(a,b) } |
+     "-" ^^^ { (a:Expression, b:Expression) => Minus(a,b) } )
+
+  // Comparison
+
+  lazy val equalTo: PackratParser[Expression] = term ~ "===" ~ term ^^ {
+    case l ~ _ ~ r => EqualTo(l, r)
+  }
+
+  lazy val equalToAlt: PackratParser[Expression] = term ~ "=" ~ term ^^ {
+    case l ~ _ ~ r => EqualTo(l, r)
+  }
+
+  lazy val notEqualTo: PackratParser[Expression] = term ~ "!==" ~ term ^^ {
+    case l ~ _ ~ r => NotEqualTo(l, r)
+  }
+
+  lazy val greaterThan: PackratParser[Expression] = term ~ ">" ~ term ^^ {
+    case l ~ _ ~ r => GreaterThan(l, r)
+  }
+
+  lazy val greaterThanOrEqual: PackratParser[Expression] = term ~ ">=" ~ term 
^^ {
+    case l ~ _ ~ r => GreaterThanOrEqual(l, r)
+  }
+
+  lazy val lessThan: PackratParser[Expression] = term ~ "<" ~ term ^^ {
+    case l ~ _ ~ r => LessThan(l, r)
+  }
+
+  lazy val lessThanOrEqual: PackratParser[Expression] = term ~ "<=" ~ term ^^ {
+    case l ~ _ ~ r => LessThanOrEqual(l, r)
+  }
+
+  lazy val comparison: PackratParser[Expression] =
+      equalTo | equalToAlt | notEqualTo |
+      greaterThan | greaterThanOrEqual |
+      lessThan | lessThanOrEqual | term
+
+  // logic
+
+  lazy val logic = comparison * (
+    "&&" ^^^ { (a:Expression, b:Expression) => And(a,b) } |
+      "||" ^^^ { (a:Expression, b:Expression) => Or(a,b) } )
+
+  // alias
+
+  lazy val alias: PackratParser[Expression] = logic ~ "as" ~ fieldReference ^^ 
{
+    case e ~ _ ~ name => Naming(e, name.name)
+  } | logic
+
+  lazy val expression: PackratParser[Expression] = alias
+
+  lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",")
+
+  def parseExpressionList(expression: String): List[Expression] = {
+    parseAll(expressionList, expression) match {
+      case Success(lst, _) => lst
+
+      case Failure(msg, _) => throw new ExpressionException("Could not parse 
expression: " + msg)
+
+      case Error(msg, _) => throw new ExpressionException("Could not parse 
expression: " + msg)
+    }
+  }
+
+  def parseExpression(exprString: String): Expression = {
+    parseAll(expression, exprString) match {
+      case Success(lst, _) => lst
+
+      case fail =>
+        throw new ExpressionException("Could not parse expression: " + 
fail.toString)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
new file mode 100644
index 0000000..7e9bc0d
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable
+import org.apache.flink.api.java.aggregation.AggregationFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+
+@Combinable
+class ExpressionAggregateFunction(
+    private val fieldPositions: Seq[Int],
+    private val functions: Seq[AggregationFunction[Any]])
+  extends RichGroupReduceFunction[Row, Row] {
+
+  override def open(conf: Configuration): Unit = {
+    var i = 0
+    val len = functions.length
+    while (i < len) {
+      functions(i).initializeAggregate()
+      i += 1
+    }
+  }
+
+  override def reduce(in: java.lang.Iterable[Row], out: Collector[Row]): Unit 
= {
+
+    val fieldPositions = this.fieldPositions
+    val functions = this.functions
+
+    var current: Row = null
+
+    val values = in.iterator()
+    while (values.hasNext) {
+      current = values.next()
+
+      var i = 0
+      val len = functions.length
+      while (i < len) {
+        functions(i).aggregate(current.productElement(fieldPositions(i)))
+        i += 1
+      }
+    }
+
+    var i = 0
+    val len = functions.length
+    while (i < len) {
+      current.setField(fieldPositions(i), functions(i).getAggregate)
+      functions(i).initializeAggregate()
+      i += 1
+    }
+
+    out.collect(current)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
new file mode 100644
index 0000000..b0e2d05
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.table.codegen.GenerateUnaryPredicate
+import org.apache.flink.api.table.tree.{NopExpression, Expression}
+import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.configuration.Configuration
+
+class ExpressionFilterFunction[T](
+    predicate: Expression,
+    inputType: CompositeType[T]) extends RichFilterFunction[T] {
+
+  var compiledPredicate: (T) => Boolean = null
+
+  override def open(config: Configuration): Unit = {
+    if (compiledPredicate == null) {
+      compiledPredicate = predicate match {
+        case n: NopExpression => null
+        case _ =>
+          val codegen = new GenerateUnaryPredicate[T](
+            inputType,
+            predicate,
+            getRuntimeContext.getUserCodeClassLoader)
+          codegen.generate()
+      }
+    }
+  }
+
+  override def filter(in: T) = compiledPredicate(in)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
new file mode 100644
index 0000000..f0f5636
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.table.tree.{NopExpression, Expression}
+import org.apache.flink.api.common.functions.RichFlatJoinFunction
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.codegen.{GenerateBinaryResultAssembler,
+GenerateBinaryPredicate}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+
+class ExpressionJoinFunction[L, R, O](
+    predicate: Expression,
+    leftType: CompositeType[L],
+    rightType: CompositeType[R],
+    resultType: CompositeType[O],
+    outputFields: Seq[Expression]) extends RichFlatJoinFunction[L, R, O] {
+
+  var compiledPredicate: (L, R) => Boolean = null
+  var resultAssembler: (L, R, O) => O = null
+  var result: O = null.asInstanceOf[O]
+
+  override def open(config: Configuration): Unit = {
+    result = 
resultType.createSerializer(getRuntimeContext.getExecutionConfig).createInstance()
+    if (compiledPredicate == null) {
+      compiledPredicate = predicate match {
+        case n: NopExpression => null
+        case _ =>
+          val codegen = new GenerateBinaryPredicate[L, R](
+            leftType,
+            rightType,
+            predicate,
+            getRuntimeContext.getUserCodeClassLoader)
+          codegen.generate()
+      }
+    }
+
+    if (resultAssembler == null) {
+      val resultCodegen = new GenerateBinaryResultAssembler[L, R, O](
+        leftType,
+        rightType,
+        resultType,
+        outputFields,
+        getRuntimeContext.getUserCodeClassLoader)
+
+      resultAssembler = resultCodegen.generate()
+    }
+  }
+
+  def join(left: L, right: R, out: Collector[O]) = {
+    if (compiledPredicate == null) {
+      result = resultAssembler(left, right, result)
+      out.collect(result)
+    } else {
+      if (compiledPredicate(left, right)) {
+        result = resultAssembler(left, right, result)
+        out.collect(result)      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
new file mode 100644
index 0000000..0a2830b
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.table.tree.Expression
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.codegen.GenerateUnaryResultAssembler
+import org.apache.flink.configuration.Configuration
+
+class ExpressionSelectFunction[I, O](
+     inputType: CompositeType[I],
+     resultType: CompositeType[O],
+     outputFields: Seq[Expression]) extends RichMapFunction[I, O] {
+
+  var resultAssembler: (I, O) => O = null
+  var result: O = null.asInstanceOf[O]
+
+  override def open(config: Configuration): Unit = {
+    result = 
resultType.createSerializer(getRuntimeContext.getExecutionConfig).createInstance()
+
+    if (resultAssembler == null) {
+      val resultCodegen = new GenerateUnaryResultAssembler[I, O](
+        inputType,
+        resultType,
+        outputFields,
+        getRuntimeContext.getUserCodeClassLoader)
+
+      resultAssembler = resultCodegen.generate()
+    }
+  }
+
+  def map(in: I): O = {
+    resultAssembler(in, result)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
new file mode 100644
index 0000000..a1bc4b7
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+/**
+ * The functions in this package are used transforming Table API operations to 
Java API operations.
+ */
+package object runtime

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/Expression.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/Expression.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/Expression.scala
new file mode 100644
index 0000000..6302572
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/Expression.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.tree
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, TypeInformation}
+
+import scala.language.postfixOps
+
+
+abstract class Expression extends Product {
+  def children: Seq[Expression]
+  def name: String = Expression.freshName("expression")
+  def typeInfo: TypeInformation[_]
+
+  /**
+   * Tests for equality by first testing for reference equality.
+   */
+  def fastEquals(other: Expression): Boolean = this.eq(other) || this == other
+
+  def transformPre(rule: PartialFunction[Expression, Expression]): Expression 
= {
+    val afterTransform = rule.applyOrElse(this, identity[Expression])
+
+    if (afterTransform fastEquals this) {
+      this.transformChildrenPre(rule)
+    } else {
+      afterTransform.transformChildrenPre(rule)
+    }
+  }
+
+  def transformChildrenPre(rule: PartialFunction[Expression, Expression]): 
Expression = {
+    var changed = false
+    val newArgs = productIterator map {
+      case child: Expression if children.contains(child) =>
+        val newChild = child.transformPre(rule)
+        if (newChild fastEquals child) {
+          child
+        } else {
+          changed = true
+          newChild
+        }
+      case other: AnyRef => other
+      case null => null
+    } toArray
+
+    if (changed) makeCopy(newArgs) else this
+  }
+
+  def transformPost(rule: PartialFunction[Expression, Expression]): Expression 
= {
+    val afterChildren = transformChildrenPost(rule)
+    if (afterChildren fastEquals this) {
+      rule.applyOrElse(this, identity[Expression])
+    } else {
+      rule.applyOrElse(afterChildren, identity[Expression])
+    }
+  }
+
+  def transformChildrenPost(rule: PartialFunction[Expression, Expression]): 
Expression = {
+    var changed = false
+    val newArgs = productIterator map {
+      case child: Expression if children.contains(child) =>
+        val newChild = child.transformPost(rule)
+        if (newChild fastEquals child) {
+          child
+        } else {
+          changed = true
+          newChild
+        }
+      case other: AnyRef => other
+      case null => null
+    } toArray
+    // toArray forces evaluation, toSeq does not seem to work here
+
+    if (changed) makeCopy(newArgs) else this
+  }
+
+  def exists(predicate: Expression => Boolean): Boolean = {
+    var exists = false
+    this.transformPre {
+      case e: Expression => if (predicate(e)) {
+        exists = true
+      }
+        e
+    }
+    exists
+  }
+
+  /**
+   * Creates a new copy of this expression with new children. This is used 
during transformation
+   * if children change. This must be overridden by Expressions that don't 
have the Constructor
+   * arguments in the same order as the `children`.
+   */
+  def makeCopy(newArgs: Seq[AnyRef]): this.type = {
+    val defaultCtor =
+      this.getClass.getConstructors.find { _.getParameterTypes.size > 0}.head
+    try {
+      defaultCtor.newInstance(newArgs.toArray: _*).asInstanceOf[this.type]
+    } catch {
+      case iae: IllegalArgumentException =>
+        println("IAE " + this)
+        throw new RuntimeException("Should never happen.")
+    }
+  }
+}
+
+abstract class BinaryExpression() extends Expression {
+  def left: Expression
+  def right: Expression
+  def children = Seq(left, right)
+}
+
+abstract class UnaryExpression() extends Expression {
+  def child: Expression
+  def children = Seq(child)
+}
+
+abstract class LeafExpression() extends Expression {
+  val children = Nil
+}
+
+case class NopExpression() extends LeafExpression {
+  val typeInfo = new NothingTypeInfo()
+  override val name = Expression.freshName("nop")
+
+}
+
+object Expression {
+  def freshName(prefix: String): String = {
+    s"$prefix-${freshNameCounter.getAndIncrement}"
+  }
+
+  val freshNameCounter = new AtomicInteger
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/aggregations.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/aggregations.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/aggregations.scala
new file mode 100644
index 0000000..e5cdac5
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/aggregations.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.tree
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.aggregation.Aggregations
+
+
+abstract sealed class Aggregation extends UnaryExpression {
+  def typeInfo = {
+    child.typeInfo match {
+      case BasicTypeInfo.LONG_TYPE_INFO => // ok
+      case BasicTypeInfo.INT_TYPE_INFO =>
+      case BasicTypeInfo.DOUBLE_TYPE_INFO =>
+      case BasicTypeInfo.FLOAT_TYPE_INFO =>
+      case BasicTypeInfo.BYTE_TYPE_INFO =>
+      case BasicTypeInfo.SHORT_TYPE_INFO =>
+      case _ =>
+      throw new ExpressionException(s"Unsupported type ${child.typeInfo} for " 
+
+        s"aggregation $this. Only numeric data types supported.")
+    }
+    child.typeInfo
+  }
+
+  override def toString = s"Aggregate($child)"
+
+  def getIntermediateFields: Seq[Expression]
+  def getFinalField(inputs: Seq[Expression]): Expression
+  def getAggregations: Seq[Aggregations]
+}
+
+case class Sum(child: Expression) extends Aggregation {
+  override def toString = s"($child).sum"
+
+  override def getIntermediateFields: Seq[Expression] = Seq(child)
+  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
+  override def getAggregations = Seq(Aggregations.SUM)
+}
+
+case class Min(child: Expression) extends Aggregation {
+  override def toString = s"($child).min"
+
+  override def getIntermediateFields: Seq[Expression] = Seq(child)
+  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
+  override def getAggregations = Seq(Aggregations.MIN)
+
+}
+
+case class Max(child: Expression) extends Aggregation {
+  override def toString = s"($child).max"
+
+  override def getIntermediateFields: Seq[Expression] = Seq(child)
+  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
+  override def getAggregations = Seq(Aggregations.MAX)
+}
+
+case class Count(child: Expression) extends Aggregation {
+  override def typeInfo = {
+    child.typeInfo match {
+      case _ => // we can count anything... :D
+    }
+    BasicTypeInfo.INT_TYPE_INFO
+  }
+
+  override def toString = s"($child).count"
+
+  override def getIntermediateFields: Seq[Expression] = 
Seq(Literal(Integer.valueOf(1)))
+  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
+  override def getAggregations = Seq(Aggregations.SUM)
+
+}
+
+case class Avg(child: Expression) extends Aggregation {
+  override def toString = s"($child).avg"
+
+  override def getIntermediateFields: Seq[Expression] = Seq(child, Literal(1))
+  // This is just sweet. Use our own AST representation and let the code 
generator do
+  // our dirty work.
+  override def getFinalField(inputs: Seq[Expression]): Expression =
+    Div(inputs(0), inputs(1))
+  override def getAggregations = Seq(Aggregations.SUM, Aggregations.SUM)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/arithmetic.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/arithmetic.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/arithmetic.scala
new file mode 100644
index 0000000..84f9b18
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/arithmetic.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.tree
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo, 
NumericTypeInfo, TypeInformation}
+
+abstract class BinaryArithmetic extends BinaryExpression {
+  def typeInfo = {
+    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-numeric operand ${left} of type ${left.typeInfo} in $this""")
+    }
+    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-numeric operand "${right}" of type ${right.typeInfo} in 
$this""")
+    }
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types 
${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    left.typeInfo
+  }
+}
+
+case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def typeInfo = {
+    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
+      !(left.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
+      throw new ExpressionException(s"Non-numeric operand type 
${left.typeInfo} in $this")
+    }
+    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
+      !(right.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
+      throw new ExpressionException(s"Non-numeric operand type 
${right.typeInfo} in $this")
+    }
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types 
${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    left.typeInfo
+  }
+
+  override def toString = s"($left + $right)"
+}
+
+case class UnaryMinus(child: Expression) extends UnaryExpression {
+  def typeInfo = {
+    if (!child.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-numeric operand ${child} of type ${child.typeInfo} in $this""")
+    }
+    child.typeInfo
+  }
+
+  override def toString = s"-($child)"
+}
+
+case class Minus(left: Expression, right: Expression) extends BinaryArithmetic 
{
+  override def toString = s"($left - $right)"
+}
+
+case class Div(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def toString = s"($left / $right)"
+}
+
+case class Mul(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def toString = s"($left * $right)"
+}
+
+case class Mod(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def toString = s"($left * $right)"
+}
+
+case class Abs(child: Expression) extends UnaryExpression {
+  def typeInfo = child.typeInfo
+
+  override def toString = s"abs($child)"
+}
+
+abstract class BitwiseBinaryArithmetic extends BinaryExpression {
+  def typeInfo: TypeInformation[_] = {
+    if (!left.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-integer operand ${left} of type ${left.typeInfo} in $this""")
+    }
+    if (!right.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-integer operand "${right}" of type ${right.typeInfo} in 
$this""")
+    }
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types 
${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    if (left.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
+      left.typeInfo
+    } else {
+      BasicTypeInfo.INT_TYPE_INFO
+    }
+  }
+}
+
+case class BitwiseAnd(left: Expression, right: Expression) extends 
BitwiseBinaryArithmetic {
+  override def toString = s"($left & $right)"
+}
+
+case class BitwiseOr(left: Expression, right: Expression) extends 
BitwiseBinaryArithmetic {
+  override def toString = s"($left | $right)"
+}
+
+
+case class BitwiseXor(left: Expression, right: Expression) extends 
BitwiseBinaryArithmetic {
+  override def toString = s"($left ^ $right)"
+}
+
+case class BitwiseNot(child: Expression) extends UnaryExpression {
+  def typeInfo: TypeInformation[_] = {
+    if (!child.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-integer operand ${child} of type ${child.typeInfo} in $this""")
+    }
+    if (child.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
+      child.typeInfo
+    } else {
+      BasicTypeInfo.INT_TYPE_INFO
+    }
+  }
+
+  override def toString = s"~($child)"
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/cast.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/cast.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/cast.scala
new file mode 100644
index 0000000..a3acc35
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/cast.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.tree
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+case class Cast(child: Expression, tpe: TypeInformation[_]) extends 
UnaryExpression {
+  def typeInfo = tpe
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/comparison.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/comparison.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/comparison.scala
new file mode 100644
index 0000000..e0a34a9
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/comparison.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.tree
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo}
+
+abstract class BinaryComparison extends BinaryExpression {
+  def typeInfo = {
+    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+      throw new ExpressionException(s"Non-numeric operand ${left} in $this")
+    }
+    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+      throw new ExpressionException(s"Non-numeric operand ${right} in $this")
+    }
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types 
${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+}
+
+case class EqualTo(left: Expression, right: Expression) extends 
BinaryComparison {
+  override def typeInfo = {
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types 
${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+
+  override def toString = s"$left === $right"
+}
+
+case class NotEqualTo(left: Expression, right: Expression) extends 
BinaryComparison {
+  override def typeInfo = {
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types 
${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+
+  override def toString = s"$left !== $right"
+}
+
+case class GreaterThan(left: Expression, right: Expression) extends 
BinaryComparison {
+  override def toString = s"$left > $right"
+}
+
+case class GreaterThanOrEqual(left: Expression, right: Expression) extends 
BinaryComparison {
+  override def toString = s"$left >= $right"
+}
+
+case class LessThan(left: Expression, right: Expression) extends 
BinaryComparison {
+  override def toString = s"$left < $right"
+}
+
+case class LessThanOrEqual(left: Expression, right: Expression) extends 
BinaryComparison {
+  override def toString = s"$left <= $right"
+}
+
+case class IsNull(child: Expression) extends UnaryExpression {
+  def typeInfo = {
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+
+  override def toString = s"($child).isNull"
+}
+
+case class IsNotNull(child: Expression) extends UnaryExpression {
+  def typeInfo = {
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+
+  override def toString = s"($child).isNotNull"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/fieldExpression.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/fieldExpression.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/fieldExpression.scala
new file mode 100644
index 0000000..cc42148
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/fieldExpression.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.tree
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+case class UnresolvedFieldReference(override val name: String) extends 
LeafExpression {
+  def typeInfo = throw new ExpressionException(s"Unresolved field reference: 
$this")
+
+  override def toString = "\"" + name
+}
+
+case class ResolvedFieldReference(
+    override val name: String,
+    tpe: TypeInformation[_]) extends LeafExpression {
+  def typeInfo = tpe
+
+  override def toString = s"'$name"
+}
+
+case class Naming(child: Expression, override val name: String) extends 
UnaryExpression {
+  def typeInfo = child.typeInfo
+
+  override def toString = s"$child as '$name"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/literals.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/literals.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/literals.scala
new file mode 100644
index 0000000..852d5a1
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/literals.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.tree
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.scala.table.ImplicitExpressionOperations
+
+object Literal {
+  def apply(l: Any): Literal = l match {
+    case i:Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO)
+    case l:Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO)
+    case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO)
+    case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO)
+    case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO)
+    case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO)
+  }
+}
+
+case class Literal(value: Any, tpe: TypeInformation[_])
+  extends LeafExpression with ImplicitExpressionOperations {
+  def expr = this
+  def typeInfo = tpe
+
+  override def toString = s"$value"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/logic.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/logic.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/logic.scala
new file mode 100644
index 0000000..8ab838d
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/logic.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.tree
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+abstract class BinaryPredicate extends BinaryExpression {
+  def typeInfo = {
+    if (left.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO ||
+      right.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+      throw new ExpressionException(s"Non-boolean operand types 
${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+}
+
+case class Not(child: Expression) extends UnaryExpression {
+  def typeInfo = {
+    if (child.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+      throw new ExpressionException(s"Non-boolean operand type 
${child.typeInfo} in $this")
+    }
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+
+  override val name = Expression.freshName("not-" + child.name)
+
+  override def toString = s"!($child)"
+}
+
+case class And(left: Expression, right: Expression) extends BinaryPredicate {
+  override def toString = s"$left && $right"
+
+  override val name = Expression.freshName(left.name + "-and-" + right.name)
+}
+
+case class Or(left: Expression, right: Expression) extends BinaryPredicate {
+  override def toString = s"$left || $right"
+
+  override val name = Expression.freshName(left.name + "-or-" + right.name)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/package.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/package.scala
new file mode 100644
index 0000000..caac402
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/package.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+/**
+ * This package contains the base class of AST nodes and all the expression 
language AST classes.
+ * Expression trees should not be manually constructed by users. They are 
implicitly constructed
+ * from the implicit DSL conversions in
+ * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and
+ * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]]. 
For the Java API,
+ * expression trees should be generated from a string parser that parses 
expressions and creates
+ * AST nodes.
+ */
+package object tree

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/stringExpressions.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/stringExpressions.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/stringExpressions.scala
new file mode 100644
index 0000000..e14374f
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/stringExpressions.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.tree
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo}
+
+case class Substring(
+    str: Expression,
+    beginIndex: Expression,
+    endIndex: Expression) extends Expression {
+  def typeInfo = {
+    if (str.typeInfo != BasicTypeInfo.STRING_TYPE_INFO) {
+      throw new ExpressionException(
+        s"""Operand must be of type String in $this, is ${str.typeInfo}.""")
+    }
+    if (!beginIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Begin index must be an integer type in $this, is 
${beginIndex.typeInfo}.""")
+    }
+    if (!endIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""End index must be an integer type in $this, is 
${endIndex.typeInfo}.""")
+    }
+
+    BasicTypeInfo.STRING_TYPE_INFO
+  }
+
+  override def children: Seq[Expression] = Seq(str, beginIndex, endIndex)
+  override def toString = s"($str).substring($beginIndex, $endIndex)"
+}

Reply via email to