http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala
deleted file mode 100644
index a37c892..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table
-
-import java.lang.reflect.Modifier
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.plan._
-import org.apache.flink.api.table.runtime.{ExpressionFilterFunction, 
ExpressionSelectFunction}
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.typeinfo.RowTypeInfo
-import org.apache.flink.api.table.{ExpressionException, Row, Table}
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.operators.StreamMap
-
-/**
- * [[PlanTranslator]] for creating [[Table]]s from Java [[DataStream]]s and
- * translating them back to Java [[DataStream]]s.
- *
- * This is very limited right now. Only select and filter are implemented. 
Also, the expression
- * operations must be extended to allow windowing operations.
- */
-
-class JavaStreamingTranslator extends PlanTranslator {
-
-  type Representation[A] = DataStream[A]
-
-  override def createTable[A](
-      repr: Representation[A],
-      inputType: CompositeType[A],
-      expressions: Array[Expression],
-      resultFields: Seq[(String, TypeInformation[_])]): Table = {
-
-    val rowDataStream = createSelect(expressions, repr, inputType)
-
-    new Table(Root(rowDataStream, resultFields))
-  }
-
-  override def translate[A](op: PlanNode)(implicit tpe: TypeInformation[A]): 
DataStream[A] = {
-
-    if (tpe.getTypeClass == classOf[Row]) {
-      // shortcut for DataSet[Row]
-      return translateInternal(op).asInstanceOf[DataStream[A]]
-    }
-
-    val clazz = tpe.getTypeClass
-    if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
-      throw new ExpressionException("Cannot create DataStream of type " +
-        clazz.getName + ". Only top-level classes or static member classes are 
supported.")
-    }
-
-    if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) {
-      throw new ExpressionException(
-        "A Table can only be converted to composite types, type is: " +
-          implicitly[TypeInformation[A]] +
-          ". Composite types would be tuples, case classes and POJOs.")
-
-    }
-
-    val resultSet = translateInternal(op)
-
-    val resultType = resultSet.getType.asInstanceOf[RowTypeInfo]
-
-    val outputType = 
implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]]
-
-    val resultNames = resultType.getFieldNames
-    val outputNames = outputType.getFieldNames.toSeq
-
-    if (resultNames.toSet != outputNames.toSet) {
-      throw new ExpressionException(s"Expression result type $resultType does 
not have the same" +
-        s"fields as output type $outputType")
-    }
-
-    for (f <- outputNames) {
-      val in = resultType.getTypeAt(resultType.getFieldIndex(f))
-      val out = outputType.getTypeAt(outputType.getFieldIndex(f))
-      if (!in.equals(out)) {
-        throw new ExpressionException(s"Types for field $f differ on input 
$resultType and " +
-          s"output $outputType.")
-      }
-    }
-
-    val outputFields = outputNames map {
-      f => ResolvedFieldReference(f, resultType.getTypeAt(f))
-    }
-
-    val function = new ExpressionSelectFunction(
-      resultSet.getType.asInstanceOf[RowTypeInfo],
-      outputType,
-      outputFields)
-
-    val opName = s"select(${outputFields.mkString(",")})"
-
-    resultSet.transform(opName, outputType, new StreamMap[Row, A](function))
-  }
-
-  private def translateInternal(op: PlanNode): DataStream[Row] = {
-    op match {
-      case Root(dataSet: DataStream[Row], resultFields) =>
-        dataSet
-
-      case Root(_, _) =>
-        throw new ExpressionException("Invalid Root for 
JavaStreamingTranslator: " + op + ". " +
-          "Did you try converting a Table based on a DataSet to a DataStream 
or vice-versa?")
-
-      case GroupBy(_, fields) =>
-        throw new ExpressionException("Dangling GroupBy operation. Did you 
forget a " +
-          "SELECT statement?")
-
-      case As(input, newNames) =>
-        throw new ExpressionException("As operation for Streams not yet 
implemented.")
-
-      case sel@Select(Filter(Join(leftInput, rightInput), predicate), 
selection) =>
-
-        val expandedInput = ExpandAggregations(sel)
-
-        if (expandedInput.eq(sel)) {
-          val translatedLeftInput = translateInternal(leftInput)
-          val translatedRightInput = translateInternal(rightInput)
-          val leftInType = 
translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
-          val rightInType = 
translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
-
-          createJoin(
-            predicate,
-            selection,
-            translatedLeftInput,
-            translatedRightInput,
-            leftInType,
-            rightInType,
-            JoinHint.OPTIMIZER_CHOOSES)
-        } else {
-          translateInternal(expandedInput)
-        }
-
-      case Filter(Join(leftInput, rightInput), predicate) =>
-        val translatedLeftInput = translateInternal(leftInput)
-        val translatedRightInput = translateInternal(rightInput)
-        val leftInType = 
translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
-        val rightInType = 
translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
-
-        createJoin(
-          predicate,
-          leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) 
++
-            rightInput.outputFields.map( f => ResolvedFieldReference(f._1, 
f._2)),
-          translatedLeftInput,
-          translatedRightInput,
-          leftInType,
-          rightInType,
-          JoinHint.OPTIMIZER_CHOOSES)
-
-      case Join(leftInput, rightInput) =>
-        throw new ExpressionException("Join without filter condition 
encountered. " +
-          "Did you forget to add .where(...) ?")
-
-      case sel@Select(input, selection) =>
-
-        val expandedInput = ExpandAggregations(sel)
-
-        if (expandedInput.eq(sel)) {
-          // no expansions took place
-          val translatedInput = translateInternal(input)
-          val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-          val inputFields = inType.getFieldNames
-          createSelect(
-            selection,
-            translatedInput,
-            inType)
-        } else {
-          translateInternal(expandedInput)
-        }
-
-      case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) =>
-        throw new ExpressionException("Aggregate operation for Streams not yet 
implemented.")
-
-      case agg@Aggregate(input, aggregations) =>
-        throw new ExpressionException("Aggregate operation for Streams not yet 
implemented.")
-
-      case Filter(input, predicate) =>
-        val translatedInput = translateInternal(input)
-        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-        val filter = new ExpressionFilterFunction[Row](predicate, inType)
-        translatedInput.filter(filter)
-
-      case UnionAll(left, right) =>
-        val translatedLeft = translateInternal(left)
-        val translatedRight = translateInternal(right)
-        translatedLeft.union(translatedRight)
-    }
-  }
-
-  private def createSelect[I](
-      fields: Seq[Expression],
-      input: DataStream[I],
-      inputType: CompositeType[I]): DataStream[Row] = {
-
-    fields foreach {
-      f =>
-        if (f.exists(_.isInstanceOf[Aggregation])) {
-          throw new ExpressionException("Found aggregate in " + 
fields.mkString(", ") + ".")
-        }
-
-    }
-
-    val resultType = new RowTypeInfo(fields)
-
-    val function = new ExpressionSelectFunction(inputType, resultType, fields)
-
-    val opName = s"select(${fields.mkString(",")})"
-
-    input.transform(opName, resultType, new StreamMap[I, Row](function))
-  }
-
-  private def createJoin[L, R](
-      predicate: Expression,
-      fields: Seq[Expression],
-      leftInput: DataStream[L],
-      rightInput: DataStream[R],
-      leftType: CompositeType[L],
-      rightType: CompositeType[R],
-      joinHint: JoinHint): DataStream[Row] = {
-
-    throw new ExpressionException("Join operation for Streams not yet 
implemented.")
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
deleted file mode 100644
index 5614031..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.table.Table
-import org.apache.flink.streaming.api.datastream.DataStream
-
-/**
- * Environment for working with the Table API.
- *
- * This can be used to convert [[DataSet]] or [[DataStream]] to a [[Table]] 
and back again. You
- * can also use the provided methods to create a [[Table]] directly from a 
data source.
- */
-class TableEnvironment {
-
-  /**
-   * Transforms the given DataSet to a [[org.apache.flink.api.table.Table]].
-   * The fields of the DataSet type are renamed to the given set of fields:
-   *
-   * Example:
-   *
-   * {{{
-   *   tableEnv.fromDataSet(set, "a, b")
-   * }}}
-   *
-   * This will transform the set containing elements of two fields to a table 
where the fields
-   * are named a and b.
-   */
-  def fromDataSet[T](set: DataSet[T], fields: String): Table = {
-    new JavaBatchTranslator().createTable(set, fields)
-  }
-
-  /**
-   * Transforms the given DataSet to a [[org.apache.flink.api.table.Table]].
-   * The fields of the DataSet type are used to name the
-   * [[org.apache.flink.api.table.Table]] fields.
-   */
-  def fromDataSet[T](set: DataSet[T]): Table = {
-    new JavaBatchTranslator().createTable(set)
-  }
-
-  /**
-   * Transforms the given DataStream to a [[org.apache.flink.api.table.Table]].
-   * The fields of the DataStream type are renamed to the given set of fields:
-   *
-   * Example:
-   *
-   * {{{
-   *   tableEnv.fromDataStream(set, "a, b")
-   * }}}
-   *
-   * This will transform the set containing elements of two fields to a table 
where the fields
-   * are named a and b.
-   */
-  def fromDataStream[T](set: DataStream[T], fields: String): Table = {
-    new JavaStreamingTranslator().createTable(set, fields)
-  }
-
-  /**
-   * Transforms the given DataStream to a [[org.apache.flink.api.table.Table]].
-   * The fields of the DataStream type are used to name the
-   * [[org.apache.flink.api.table.Table]] fields.
-   */
-  def fromDataStream[T](set: DataStream[T]): Table = {
-    new JavaStreamingTranslator().createTable(set)
-  }
-
-  /**
-   * Converts the given [[org.apache.flink.api.table.Table]] to
-   * a DataSet. The given type must have exactly the same fields as the
-   * [[org.apache.flink.api.table.Table]]. That is, the names of the
-   * fields and the types must match.
-   */
-  @SuppressWarnings(Array("unchecked"))
-  def toDataSet[T](table: Table, clazz: Class[T]): DataSet[T] = {
-    new JavaBatchTranslator().translate[T](table.operation)(
-      TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]])
-  }
-
-  /**
-   * Converts the given [[org.apache.flink.api.table.Table]] to
-   * a DataStream. The given type must have exactly the same fields as the
-   * [[org.apache.flink.api.table.Table]]. That is, the names of the
-   * fields and the types must match.
-   */
-  @SuppressWarnings(Array("unchecked"))
-  def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
-    new JavaStreamingTranslator().translate[T](table.operation)(
-      TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]])
-
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
deleted file mode 100644
index 2508a3d..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.table
-
-import org.apache.flink.api.table._
-import org.apache.flink.api.table.expressions.{UnresolvedFieldReference, 
Expression}
-import org.apache.flink.api.common.typeutils.CompositeType
-
-import org.apache.flink.api.scala._
-
-/**
- * Methods for converting a [[DataSet]] to a [[Table]]. A [[DataSet]] is
- * wrapped in this by the implicit conversions in 
[[org.apache.flink.api.scala.table]].
- */
-class DataSetConversions[T](set: DataSet[T], inputType: CompositeType[T]) {
-
-  /**
-   * Converts the [[DataSet]] to a [[Table]]. The field names can be specified 
like this:
-   *
-   * {{{
-   *   val in: DataSet[(String, Int)] = ...
-   *   val table = in.as('a, 'b)
-   * }}}
-   *
-   * This results in a [[Table]] that has field `a` of type `String` and field 
`b`
-   * of type `Int`.
-   */
-  def as(fields: Expression*): Table = {
-     new ScalaBatchTranslator().createTable(set, fields.toArray)
-  }
-
-  /**
-   * Converts the [[DataSet]] to a [[Table]]. The field names will be taken 
from the field names
-   * of the input type.
-   *
-   * Example:
-   *
-   * {{{
-   *   val in: DataSet[(String, Int)] = ...
-   *   val table = in.toTable
-   * }}}
-   *
-   * Here, the result is a [[Table]] that has field `_1` of type `String` and 
field `_2`
-   * of type `Int`.
-   */
-  def toTable: Table = {
-    val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference)
-    as(resultFields: _*)
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
deleted file mode 100644
index 47bd100..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table
-
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table._
-import org.apache.flink.api.table.expressions.{Expression, 
UnresolvedFieldReference}
-import org.apache.flink.streaming.api.scala.DataStream
-
-class DataStreamConversions[T](stream: DataStream[T], inputType: 
CompositeType[T]) {
-
-  /**
-   * Converts the [[DataStream]] to a [[Table]]. The field names can be 
specified like this:
-   *
-   * {{{
-   *   val in: DataSet[(String, Int)] = ...
-   *   val table = in.as('a, 'b)
-   * }}}
-   *
-   * This results in a [[Table]] that has field `a` of type `String` and field 
`b`
-   * of type `Int`.
-   */
-
-  def as(fields: Expression*): Table = {
-     new ScalaStreamingTranslator().createTable(
-       stream,
-       fields.toArray,
-       checkDeterministicFields = true)
-  }
-
-  /**
-   * Converts the [[DataStream]] to a [[Table]]. The field names will be taken 
from the field
-   * names of the input type.
-   *
-   * Example:
-   *
-   * {{{
-   *   val in: DataSet[(String, Int)] = ...
-   *   val table = in.toTable
-   * }}}
-   *
-   * This results in a [[Table]] that has field `_1` of type `String` and 
field `_2`
-   * of type `Int`.
-   */
-
-  def toTable: Table = {
-    val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference)
-    as(resultFields: _*)
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
deleted file mode 100644
index cdcf53e..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table
-
-
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.table.JavaBatchTranslator
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.scala.wrap
-import org.apache.flink.api.table.plan._
-import org.apache.flink.api.table.Table
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.DataSet
-
-import scala.reflect.ClassTag
-
-
-/**
- * [[PlanTranslator]] for creating [[Table]]s from Scala [[DataSet]]s and
- * translating them back to Scala [[DataSet]]s.
- */
-class ScalaBatchTranslator extends PlanTranslator {
-
-  private val javaTranslator = new JavaBatchTranslator
-
-  type Representation[A] = DataSet[A]
-
-  def createTable[A](
-      repr: DataSet[A],
-      fields: Array[Expression]): Table = {
-
-    val result = javaTranslator.createTable(repr.javaSet, fields)
-
-    new Table(result.operation)
-  }
-
-  override def translate[O](op: PlanNode)(implicit tpe: TypeInformation[O]): 
DataSet[O] = {
-    // fake it till you make it ...
-    
wrap(javaTranslator.translate(op))(ClassTag.AnyRef.asInstanceOf[ClassTag[O]])
-  }
-
-  override def createTable[A](
-      repr: Representation[A],
-      inputType: CompositeType[A],
-      expressions: Array[Expression],
-      resultFields: Seq[(String, TypeInformation[_])]): Table = {
-
-    val result = javaTranslator.createTable(repr.javaSet, inputType, 
expressions, resultFields)
-
-    Table(result.operation)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
deleted file mode 100644
index 88f1b83..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.table.JavaStreamingTranslator
-import org.apache.flink.api.table.Table
-import org.apache.flink.api.table.plan._
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.streaming.api.scala.{DataStream, javaToScalaStream}
-
-/**
- * [[PlanTranslator]] for creating [[Table]]s from Scala [[DataStream]]s and
- * translating them back to Scala [[DataStream]]s.
- *
- * This is very limited right now. Only select and filter are implemented. 
Also, the expression
- * operations must be extended to allow windowing operations.
- */
-class ScalaStreamingTranslator extends PlanTranslator {
-
-  private val javaTranslator = new JavaStreamingTranslator
-
-  override type Representation[A] = DataStream[A]
-
-  override def translate[O](op: PlanNode)(implicit tpe: TypeInformation[O]): 
DataStream[O] = {
-    // fake it till you make it ...
-    javaToScalaStream(javaTranslator.translate(op))
-  }
-
-  override def createTable[A](
-      repr: Representation[A],
-      inputType: CompositeType[A],
-      expressions: Array[Expression],
-      resultFields: Seq[(String, TypeInformation[_])]): Table = {
-
-    val result =
-      javaTranslator.createTable(repr.getJavaStream, inputType, expressions, 
resultFields)
-
-    new Table(result.operation)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
deleted file mode 100644
index 4f2172e..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala._
-import org.apache.flink.api.table._
-
-import org.apache.flink.streaming.api.scala.DataStream
-
-/**
- * Methods for converting a [[Table]] to a [[DataSet]] or [[DataStream]]. A 
[[Table]] is
- * wrapped in this by the implicit conversions in 
[[org.apache.flink.api.scala.table]].
- */
-class TableConversions(table: Table) {
-
-  /**
-   * Converts the [[Table]] to a [[DataSet]].
-   */
-  def toDataSet[T: TypeInformation]: DataSet[T] = {
-     new ScalaBatchTranslator().translate[T](table.operation)
-  }
-
-  /**
-   * Converts the [[Table]] to a [[DataStream]].
-   */
-  def toDataStream[T: TypeInformation]: DataStream[T] = {
-    new ScalaStreamingTranslator().translate[T](table.operation)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
deleted file mode 100644
index 0be6be2..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.table
-
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-import scala.language.implicitConversions
-
-/**
- * These are all the operations that can be used to construct an 
[[Expression]] AST for expression
- * operations.
- *
- * These operations must be kept in sync with the parser in
- * [[org.apache.flink.api.table.parser.ExpressionParser]].
- */
-trait ImplicitExpressionOperations {
-  def expr: Expression
-
-  def && (other: Expression) = And(expr, other)
-  def || (other: Expression) = Or(expr, other)
-
-  def > (other: Expression) = GreaterThan(expr, other)
-  def >= (other: Expression) = GreaterThanOrEqual(expr, other)
-  def < (other: Expression) = LessThan(expr, other)
-  def <= (other: Expression) = LessThanOrEqual(expr, other)
-
-  def === (other: Expression) = EqualTo(expr, other)
-  def !== (other: Expression) = NotEqualTo(expr, other)
-
-  def unary_! = Not(expr)
-  def unary_- = UnaryMinus(expr)
-
-  def isNull = IsNull(expr)
-  def isNotNull = IsNotNull(expr)
-
-  def + (other: Expression) = Plus(expr, other)
-  def - (other: Expression) = Minus(expr, other)
-  def / (other: Expression) = Div(expr, other)
-  def * (other: Expression) = Mul(expr, other)
-  def % (other: Expression) = Mod(expr, other)
-
-  def & (other: Expression) = BitwiseAnd(expr, other)
-  def | (other: Expression) = BitwiseOr(expr, other)
-  def ^ (other: Expression) = BitwiseXor(expr, other)
-  def unary_~ = BitwiseNot(expr)
-
-  def abs = Abs(expr)
-
-  def sum = Sum(expr)
-  def min = Min(expr)
-  def max = Max(expr)
-  def count = Count(expr)
-  def avg = Avg(expr)
-
-  def substring(beginIndex: Expression, endIndex: Expression = 
Literal(Int.MaxValue)) = {
-    Substring(expr, beginIndex, endIndex)
-  }
-
-  def cast(toType: TypeInformation[_]) = Cast(expr, toType)
-
-  def as(name: Symbol) = Naming(expr, name.name)
-}
-
-/**
- * Implicit conversions from Scala Literals to Expression [[Literal]] and from 
[[Expression]]
- * to [[ImplicitExpressionOperations]].
- */
-trait ImplicitExpressionConversions {
-  implicit class WithOperations(e: Expression) extends 
ImplicitExpressionOperations {
-    def expr = e
-  }
-
-  implicit class SymbolExpression(s: Symbol) extends 
ImplicitExpressionOperations {
-    def expr = UnresolvedFieldReference(s.name)
-  }
-
-  implicit class LiteralLongExpression(l: Long) extends 
ImplicitExpressionOperations {
-    def expr = Literal(l)
-  }
-
-  implicit class LiteralIntExpression(i: Int) extends 
ImplicitExpressionOperations {
-    def expr = Literal(i)
-  }
-
-  implicit class LiteralFloatExpression(f: Float) extends 
ImplicitExpressionOperations {
-    def expr = Literal(f)
-  }
-
-  implicit class LiteralDoubleExpression(d: Double) extends 
ImplicitExpressionOperations {
-    def expr = Literal(d)
-  }
-
-  implicit class LiteralStringExpression(str: String) extends 
ImplicitExpressionOperations {
-    def expr = Literal(str)
-  }
-
-  implicit class LiteralBooleanExpression(bool: Boolean) extends 
ImplicitExpressionOperations {
-    def expr = Literal(bool)
-  }
-
-  implicit def symbol2FieldExpression(sym: Symbol): Expression = 
UnresolvedFieldReference(sym.name)
-  implicit def int2Literal(i: Int): Expression = Literal(i)
-  implicit def long2Literal(l: Long): Expression = Literal(l)
-  implicit def double2Literal(d: Double): Expression = Literal(d)
-  implicit def float2Literal(d: Float): Expression = Literal(d)
-  implicit def string2Literal(str: String): Expression = Literal(str)
-  implicit def boolean2Literal(bool: Boolean): Expression = Literal(bool)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
deleted file mode 100644
index e74651b..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala
-
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.{Row, Table}
-import org.apache.flink.streaming.api.scala.DataStream
-
-import scala.language.implicitConversions
-
-/**
- * == Table API (Scala) ==
- *
- * Importing this package with:
- *
- * {{{
- *   import org.apache.flink.api.scala.table._
- * }}}
- *
- * imports implicit conversions for converting a [[DataSet]] or [[DataStream]] 
to a
- * [[Table]]. This can be used to perform SQL-like queries on data. Please have
- * a look at [[Table]] to see which operations are supported and
- * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]] to see 
how an
- * expression can be specified.
- *
- * When writing a query you can use Scala Symbols to refer to field names. One 
would
- * refer to field `a` by writing `'a`. Sometimes it is necessary to manually 
confert a
- * Scala literal to an Expression Literal, in those cases use `Literal`, as in 
`Literal(3)`.
- *
- * Example:
- *
- * {{{
- *   import org.apache.flink.api.scala._
- *   import org.apache.flink.api.scala.table._
- *
- *   val env = ExecutionEnvironment.getExecutionEnvironment
- *   val input = env.fromElements(("Hello", 2), ("Hello", 5), ("Ciao", 3))
- *   val result = input.as('word, 'count).groupBy('word).select('word, 
'count.avg)
- *   result.print()
- *
- *   env.execute()
- * }}}
- *
- * A [[Table]] can be converted back to the underlying API
- * representation using `as`:
- *
- * {{{
- *   case class Word(word: String, count: Int)
- *
- *   val result = in.select(...).as('word, 'count)
- *   val set = result.as[Word]
- * }}}
- */
-package object table extends ImplicitExpressionConversions {
-
-  implicit def table2TableConversions(table: Table): TableConversions = {
-    new TableConversions(table)
-  }
-
-  implicit def dataSet2DataSetConversions[T](set: DataSet[T]): 
DataSetConversions[T] = {
-    new DataSetConversions[T](set, set.getType.asInstanceOf[CompositeType[T]])
-  }
-
-  implicit def table2RowDataSet(
-      table: Table): DataSet[Row] = {
-    new ScalaBatchTranslator().translate[Row](table.operation)
-  }
-
-  implicit def rowDataSet2Table(
-      rowDataSet: DataSet[Row]): Table = {
-    rowDataSet.toTable
-  }
-
-  implicit def dataStream2DataSetConversions[T](
-      stream: DataStream[T]): DataStreamConversions[T] = {
-    new DataStreamConversions[T](
-      stream,
-      stream.getJavaStream.getType.asInstanceOf[CompositeType[T]])
-  }
-
-  implicit def table2RowDataStream(
-      table: Table): DataStream[Row] = {
-    new ScalaStreamingTranslator().translate[Row](table.operation)
-  }
-
-  implicit def rowDataStream2Table(
-      rowDataStream: DataStream[Row]): Table = {
-    rowDataStream.toTable
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala
deleted file mode 100644
index 51c0a4d..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-/**
- * Exception for all errors occurring during expression evaluation.
- */
-class ExpressionException(msg: String) extends RuntimeException(msg)

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala
deleted file mode 100644
index e3baab3..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-/**
- * This is used for executing Table API operations. We use manually generated
- * TypeInfo to check the field types and create serializers and comparators.
- */
-class Row(arity: Int) extends Product {
-
-  private val fields = new Array[Any](arity)
-
-  def productArity = fields.length
-
-  def productElement(i: Int): Any = fields(i)
-
-  def setField(i: Int, value: Any): Unit = fields(i) = value
-
-  def canEqual(that: Any) = false
-
-  override def toString = fields.mkString(",")
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
deleted file mode 100644
index 641f2fa..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-import org.apache.flink.api.java.io.DiscardingOutputFormat
-import org.apache.flink.api.table.explain.PlanJsonParser
-import org.apache.flink.api.table.expressions.analysis.{GroupByAnalyzer, 
PredicateAnalyzer, SelectionAnalyzer}
-import org.apache.flink.api.table.expressions.{Expression, 
ResolvedFieldReference, UnresolvedFieldReference}
-import org.apache.flink.api.table.parser.ExpressionParser
-import org.apache.flink.api.table.plan._
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-
-/**
- * The abstraction for writing Table API programs. Similar to how the batch 
and streaming APIs
- * have [[org.apache.flink.api.scala.DataSet]] and
- * [[org.apache.flink.streaming.api.scala.DataStream]].
- *
- * Use the methods of [[Table]] to transform data. Use
- * [[org.apache.flink.api.java.table.TableEnvironment]] to convert a [[Table]] 
back to a DataSet
- * or DataStream.
- *
- * When using Scala a [[Table]] can also be converted using implicit 
conversions.
- *
- * Example:
- *
- * {{{
- *   val table = set.toTable('a, 'b)
- *   ...
- *   val table2 = ...
- *   val set = table2.toDataSet[MyType]
- * }}}
- *
- * Operations such as [[join]], [[select]], [[where]] and [[groupBy]] either 
take arguments
- * in a Scala DSL or as an expression String. Please refer to the 
documentation for the expression
- * syntax.
- */
-case class Table(private[flink] val operation: PlanNode) {
-
-  /**
-   * Performs a selection operation. Similar to an SQL SELECT statement. The 
field expressions
-   * can contain complex expressions and aggregations.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.select('key, 'value.avg + " The average" as 'average, 
'other.substring(0, 10))
-   * }}}
-   */
-  def select(fields: Expression*): Table = {
-    val analyzer = new SelectionAnalyzer(operation.outputFields)
-    val analyzedFields = fields.map(analyzer.analyze)
-    val fieldNames = analyzedFields map(_.name)
-    if (fieldNames.toSet.size != fieldNames.size) {
-      throw new ExpressionException(s"Resulting fields names are not unique in 
expression" +
-        s""" "${fields.mkString(", ")}".""")
-    }
-    this.copy(operation = Select(operation, analyzedFields))
-  }
-
-  /**
-   * Performs a selection operation. Similar to an SQL SELECT statement. The 
field expressions
-   * can contain complex expressions and aggregations.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.select("key, value.avg + " The average" as average, 
other.substring(0, 10)")
-   * }}}
-   */
-  def select(fields: String): Table = {
-    val fieldExprs = ExpressionParser.parseExpressionList(fields)
-    select(fieldExprs: _*)
-  }
-
-  /**
-   * Renames the fields of the expression result. Use this to disambiguate 
fields before
-   * joining to operations.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.as('a, 'b)
-   * }}}
-   */
-  def as(fields: Expression*): Table = {
-    fields forall {
-      f => f.isInstanceOf[UnresolvedFieldReference]
-    } match {
-      case true =>
-      case false => throw new ExpressionException("Only field expression 
allowed in as().")
-    }
-    this.copy(operation = As(operation, fields.toArray map { _.name }))
-  }
-
-  /**
-   * Renames the fields of the expression result. Use this to disambiguate 
fields before
-   * joining to operations.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.as("a, b")
-   * }}}
-   */
-  def as(fields: String): Table = {
-    val fieldExprs = ExpressionParser.parseExpressionList(fields)
-    as(fieldExprs: _*)
-  }
-
-  /**
-   * Filters out elements that don't pass the filter predicate. Similar to a 
SQL WHERE
-   * clause.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.filter('name === "Fred")
-   * }}}
-   */
-  def filter(predicate: Expression): Table = {
-    val analyzer = new PredicateAnalyzer(operation.outputFields)
-    val analyzedPredicate = analyzer.analyze(predicate)
-    this.copy(operation = Filter(operation, analyzedPredicate))
-  }
-
-  /**
-   * Filters out elements that don't pass the filter predicate. Similar to a 
SQL WHERE
-   * clause.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.filter("name = 'Fred'")
-   * }}}
-   */
-  def filter(predicate: String): Table = {
-    val predicateExpr = ExpressionParser.parseExpression(predicate)
-    filter(predicateExpr)
-  }
-
-  /**
-   * Filters out elements that don't pass the filter predicate. Similar to a 
SQL WHERE
-   * clause.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.where('name === "Fred")
-   * }}}
-   */
-  def where(predicate: Expression): Table = {
-    filter(predicate)
-  }
-
-  /**
-   * Filters out elements that don't pass the filter predicate. Similar to a 
SQL WHERE
-   * clause.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.where("name = 'Fred'")
-   * }}}
-   */
-  def where(predicate: String): Table = {
-    filter(predicate)
-  }
-
-  /**
-   * Groups the elements on some grouping keys. Use this before a selection 
with aggregations
-   * to perform the aggregation on a per-group basis. Similar to a SQL GROUP 
BY statement.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.groupBy('key).select('key, 'value.avg)
-   * }}}
-   */
-  def groupBy(fields: Expression*): Table = {
-    val analyzer = new GroupByAnalyzer(operation.outputFields)
-    val analyzedFields = fields.map(analyzer.analyze)
-
-    val illegalKeys = analyzedFields filter {
-      case fe: ResolvedFieldReference => false // OK
-      case e => true
-    }
-
-    if (illegalKeys.nonEmpty) {
-      throw new ExpressionException("Illegal key expressions: " + 
illegalKeys.mkString(", "))
-    }
-
-    this.copy(operation = GroupBy(operation, analyzedFields))
-  }
-
-  /**
-   * Groups the elements on some grouping keys. Use this before a selection 
with aggregations
-   * to perform the aggregation on a per-group basis. Similar to a SQL GROUP 
BY statement.
-   *
-   * Example:
-   *
-   * {{{
-   *   in.groupBy("key").select("key, value.avg")
-   * }}}
-   */
-  def groupBy(fields: String): Table = {
-    val fieldsExpr = ExpressionParser.parseExpressionList(fields)
-    groupBy(fieldsExpr: _*)
-  }
-
-  /**
-   * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
-   * operations must not overlap, use [[as]] to rename fields if necessary. 
You can use
-   * where and select clauses after a join to further specify the behaviour of 
the join.
-   *
-   * Example:
-   *
-   * {{{
-   *   left.join(right).where('a === 'b && 'c > 3).select('a, 'b, 'd)
-   * }}}
-   */
-  def join(right: Table): Table = {
-    val leftInputNames = operation.outputFields.map(_._1).toSet
-    val rightInputNames = right.operation.outputFields.map(_._1).toSet
-    if (leftInputNames.intersect(rightInputNames).nonEmpty) {
-      throw new ExpressionException(
-        "Overlapping fields names on join input, result would be ambiguous: " +
-          operation.outputFields.mkString(", ") +
-          " and " +
-          right.operation.outputFields.mkString(", ") )
-    }
-    this.copy(operation = Join(operation, right.operation))
-  }
-
-  /**
-   * Union two[[Table]]s. Similar to an SQL UNION ALL. The fields of the two 
union operations
-   * must fully overlap.
-   *
-   * Example:
-   *
-   * {{{
-   *   left.unionAll(right)
-   * }}}
-   */
-  def unionAll(right: Table): Table = {
-    val leftInputFields = operation.outputFields
-    val rightInputFields = right.operation.outputFields
-    if (!leftInputFields.equals(rightInputFields)) {
-      throw new ExpressionException(
-        "The fields names of join inputs should be fully overlapped, left 
inputs fields:" +
-          operation.outputFields.mkString(", ") +
-          " and right inputs fields" +
-          right.operation.outputFields.mkString(", ")
-      )
-    }
-    this.copy(operation = UnionAll(operation, right.operation))
-  }
-
-  /**
-   * Get the process of the sql parsing, print AST and physical execution 
plan.The AST
-   * show the structure of the supplied statement. The execution plan shows 
how the table 
-   * referenced by the statement will be scanned.
-   */
-  def explain(extended: Boolean): String = {
-    val ast = operation
-    val dataSet = this.toDataSet[Row]
-    val env = dataSet.getExecutionEnvironment
-    dataSet.output(new DiscardingOutputFormat[Row])
-    val jasonSqlPlan = env.getExecutionPlan()
-    val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
-    val result = "== Abstract Syntax Tree ==\n" + ast + "\n\n" + "== Physical 
Execution Plan ==" +
-      "\n" + sqlPlan
-    return result
-  }
-  
-  def explain(): String = explain(false)
-  
-  override def toString: String = s"Expression($operation)"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala
deleted file mode 100644
index ffa2bec..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-import java.util.TimeZone
-
-/**
- * A config to define the runtime behavior of the Table API.
- */
-class TableConfig extends Serializable {
-
-  /**
-   * Defines the timezone for date/time/timestamp conversions.
-   */
-  private var timeZone: TimeZone = TimeZone.getTimeZone("UTC")
-
-  /**
-   * Defines if all fields need to be checked for NULL first.
-   */
-  private var nullCheck: Boolean = false
-
-  /**
-   * Sets the timezone for date/time/timestamp conversions.
-   */
-  def setTimeZone(timeZone: TimeZone) = {
-    require(timeZone != null, "timeZone must not be null.")
-    this.timeZone = timeZone
-  }
-
-  /**
-   * Returns the timezone for date/time/timestamp conversions.
-   */
-  def getTimeZone = timeZone
-
-  /**
-   * Returns the NULL check. If enabled, all fields need to be checked for 
NULL first.
-   */
-  def getNullCheck = nullCheck
-
-  /**
-   * Sets the NULL check. If enabled, all fields need to be checked for NULL 
first.
-   */
-  def setNullCheck(nullCheck: Boolean) = {
-    this.nullCheck = nullCheck
-  }
-
-}
-
-object TableConfig {
-  val DEFAULT = new TableConfig()
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
deleted file mode 100644
index a03ba61..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
+++ /dev/null
@@ -1,829 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.codegen
-
-import java.util.Date
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.typeinfo.{RenamingProxyTypeInfo, RowTypeInfo}
-import org.apache.flink.api.table.{ExpressionException, TableConfig, 
expressions}
-import org.codehaus.janino.SimpleCompiler
-import org.slf4j.LoggerFactory
-
-import scala.collection.mutable
-
-/** Base class for all code generation classes. This provides the 
functionality for generating
-  * code from an [[Expression]] tree. Derived classes must embed this in a 
lambda function
-  * to form an executable code block.
-  *
-  * @param inputs List of input variable names with corresponding 
[[TypeInformation]].
-  * @param cl The ClassLoader that is used to create the Scala reflection 
ToolBox
-  * @param config General configuration specifying runtime behaviour.
-  * @tparam R The type of the generated code block. In most cases a lambda 
function such
-  *           as "(IN1, IN2) => OUT".
-  */
-abstract class ExpressionCodeGenerator[R](
-    inputs: Seq[(String, CompositeType[_])],
-    cl: ClassLoader,
-    config: TableConfig) {
-  protected val log = 
LoggerFactory.getLogger(classOf[ExpressionCodeGenerator[_]])
-
-  import scala.reflect.runtime.universe._
-  import scala.reflect.runtime.{universe => ru}
-
-  if (cl == null) {
-    throw new IllegalArgumentException("ClassLoader must not be null.")
-  }
-
-  val compiler = new SimpleCompiler()
-  compiler.setParentClassLoader(cl)
-
-  protected val reusableMemberStatements = mutable.Set[String]()
-
-  protected val reusableInitStatements = mutable.Set[String]()
-
-  protected def reuseMemberCode(): String = {
-    reusableMemberStatements.mkString("", "\n", "\n")
-  }
-
-  protected def reuseInitCode(): String = {
-    reusableInitStatements.mkString("", "\n", "\n")
-  }
-
-  protected def nullCheck: Boolean = config.getNullCheck
-
-  // This is to be implemented by subclasses, we have it like this
-  // so that we only call it from here with the Scala Reflection Lock.
-  protected def generateInternal(): R
-
-  final def generate(): R = {
-    generateInternal()
-  }
-
-  protected def generateExpression(expr: Expression): GeneratedExpression = {
-    generateExpressionInternal(expr)
-  }
-
-  protected def generateExpressionInternal(expr: Expression): 
GeneratedExpression = {
-    //  protected def generateExpression(expr: Expression): 
GeneratedExpression = {
-    val nullTerm = freshName("isNull")
-    val resultTerm = freshName("result")
-
-    // For binary predicates that must only be evaluated when both operands 
are non-null.
-    // This will write to nullTerm and resultTerm, so don't use those term 
names
-    // after using this function
-    def generateIfNonNull(left: Expression, right: Expression, resultType: 
TypeInformation[_])
-                         (expr: (String, String) => String): String = {
-      val leftCode = generateExpression(left)
-      val rightCode = generateExpression(right)
-
-      val leftTpe = typeTermForTypeInfo(left.typeInfo)
-      val rightTpe = typeTermForTypeInfo(right.typeInfo)
-      val resultTpe = typeTermForTypeInfo(resultType)
-
-      if (nullCheck) {
-        leftCode.code + "\n" + 
-          rightCode.code + "\n" +
-          s"""
-            |boolean $nullTerm = ${leftCode.nullTerm} || ${rightCode.nullTerm};
-            |$resultTpe $resultTerm;
-            |if ($nullTerm) {
-            |  $resultTerm = ${defaultPrimitive(resultType)};
-            |} else {
-            |  $resultTerm = ${expr(leftCode.resultTerm, 
rightCode.resultTerm)};
-            |}
-          """.stripMargin
-      } else {
-        leftCode.code + "\n" +
-          rightCode.code + "\n" +
-          s"""
-            |$resultTpe $resultTerm = ${expr(leftCode.resultTerm, 
rightCode.resultTerm)};
-          """.stripMargin
-      }
-    }
-
-    def cleanedExpr(e: Expression): Expression =  {
-      e match {
-        case expressions.Naming(namedExpr, _) => cleanedExpr(namedExpr)
-        case _ => e
-      }
-    }
-
-    val cleanedExpression = cleanedExpr(expr)
-    val resultTpe = typeTermForTypeInfo(cleanedExpression.typeInfo)
-
-    val code: String = cleanedExpression match {
-
-      case expressions.Literal(null, typeInfo) =>
-        if (nullCheck) {
-          s"""
-            |boolean $nullTerm = true;
-            |$resultTpe resultTerm = null;
-          """.stripMargin
-        } else {
-          s"""
-            |$resultTpe resultTerm = null;
-          """.stripMargin
-        }
-
-      case expressions.Literal(intValue: Int, INT_TYPE_INFO) =>
-        if (nullCheck) {
-          s"""
-            |boolean $nullTerm = false;
-            |$resultTpe $resultTerm = $intValue;
-          """.stripMargin
-        } else {
-          s"""
-            |$resultTpe $resultTerm = $intValue;
-          """.stripMargin
-        }
-
-      case expressions.Literal(longValue: Long, LONG_TYPE_INFO) =>
-        if (nullCheck) {
-          s"""
-            |boolean $nullTerm = false;
-            |$resultTpe $resultTerm = ${longValue}L;
-          """.stripMargin
-        } else {
-          s"""
-            |$resultTpe $resultTerm = ${longValue}L;
-          """.stripMargin
-        }
-
-
-      case expressions.Literal(doubleValue: Double, DOUBLE_TYPE_INFO) =>
-        if (nullCheck) {
-          s"""
-            |boolean $nullTerm = false;
-            |$resultTpe $resultTerm = $doubleValue;
-          """.stripMargin
-        } else {
-          s"""
-            |$resultTpe $resultTerm = $doubleValue;
-          """.stripMargin
-        }
-
-      case expressions.Literal(floatValue: Float, FLOAT_TYPE_INFO) =>
-        if (nullCheck) {
-          s"""
-            |boolean $nullTerm = false;
-            |$resultTpe $resultTerm = ${floatValue}f;
-          """.stripMargin
-        } else {
-          s"""
-            |$resultTpe $resultTerm = ${floatValue}f;
-          """.stripMargin
-        }
-
-      case expressions.Literal(strValue: String, STRING_TYPE_INFO) =>
-        if (nullCheck) {
-          s"""
-            |boolean $nullTerm = false;
-            |$resultTpe $resultTerm = "$strValue";
-          """.stripMargin
-        } else {
-          s"""
-            |$resultTpe $resultTerm = "$strValue";
-          """.stripMargin
-        }
-
-      case expressions.Literal(boolValue: Boolean, BOOLEAN_TYPE_INFO) =>
-        if (nullCheck) {
-          s"""
-            |boolean $nullTerm = false;
-            |$resultTpe $resultTerm = $boolValue;
-          """.stripMargin
-        } else {
-          s"""
-            $resultTpe $resultTerm = $boolValue;
-          """.stripMargin
-        }
-
-      case expressions.Literal(dateValue: Date, DATE_TYPE_INFO) =>
-        val dateName = s"""date_${dateValue.getTime}"""
-        val dateStmt = s"""static java.util.Date $dateName
-             |= new java.util.Date(${dateValue.getTime});""".stripMargin
-        reusableMemberStatements.add(dateStmt)
-
-        if (nullCheck) {
-          s"""
-            |boolean $nullTerm = false;
-            |$resultTpe $resultTerm = $dateName;
-          """.stripMargin
-        } else {
-          s"""
-            |$resultTpe $resultTerm = $dateName;
-          """.stripMargin
-        }
-
-      case Substring(str, beginIndex, endIndex) =>
-        val strCode = generateExpression(str)
-        val beginIndexCode = generateExpression(beginIndex)
-        val endIndexCode = generateExpression(endIndex)
-        if (nullCheck) {
-          strCode.code +
-            beginIndexCode.code +
-            endIndexCode.code +
-            s"""
-              boolean $nullTerm =
-                ${strCode.nullTerm} || ${beginIndexCode.nullTerm} || 
${endIndexCode.nullTerm};
-              $resultTpe $resultTerm;
-              if ($nullTerm) {
-                $resultTerm = ${defaultPrimitive(str.typeInfo)};
-              } else {
-                if (${endIndexCode.resultTerm} == Int.MaxValue) {
-                   $resultTerm = 
(${strCode.resultTerm}).substring(${beginIndexCode.resultTerm});
-                } else {
-                  $resultTerm = (${strCode.resultTerm}).substring(
-                    ${beginIndexCode.resultTerm},
-                    ${endIndexCode.resultTerm});
-                }
-              }
-            """.stripMargin
-        } else {
-          strCode.code +
-            beginIndexCode.code +
-            endIndexCode.code +
-            s"""
-              $resultTpe $resultTerm;
-
-              if (${endIndexCode.resultTerm} == Integer.MAX_VALUE) {
-                $resultTerm = 
(${strCode.resultTerm}).substring(${beginIndexCode.resultTerm});
-              } else {
-                $resultTerm = (${strCode.resultTerm}).substring(
-                  ${beginIndexCode.resultTerm},
-                  ${endIndexCode.resultTerm});
-              }
-            """
-        }
-
-      case expressions.Cast(child: Expression, STRING_TYPE_INFO)
-        if child.typeInfo == BasicTypeInfo.DATE_TYPE_INFO =>
-        val childGen = generateExpression(child)
-
-        addTimestampFormatter()
-
-        val castCode = if (nullCheck) {
-          s"""
-            |boolean $nullTerm = ${childGen.nullTerm};
-            |$resultTpe $resultTerm;
-            |if ($nullTerm) {
-            |  $resultTerm = null;
-            |} else {
-            |  $resultTerm = timestampFormatter.format(${childGen.resultTerm});
-            |}
-          """.stripMargin
-        } else {
-          s"""
-            |$resultTpe $resultTerm = 
timestampFormatter.format(${childGen.resultTerm});
-          """.stripMargin
-        }
-        childGen.code + castCode
-
-      case expressions.Cast(child: Expression, STRING_TYPE_INFO) =>
-        val childGen = generateExpression(child)
-        val castCode = if (nullCheck) {
-          s"""
-            |boolean $nullTerm = ${childGen.nullTerm};
-            |$resultTpe $resultTerm;
-            |if ($nullTerm) {
-            |  $resultTerm = null;
-            |} else {
-            |  $resultTerm = "" + ${childGen.resultTerm};
-            |}
-          """.stripMargin
-        } else {
-          s"""
-            |$resultTpe $resultTerm = "" + ${childGen.resultTerm};
-          """.stripMargin
-        }
-        childGen.code + castCode
-
-      case expressions.Cast(child: Expression, DATE_TYPE_INFO)
-        if child.typeInfo == BasicTypeInfo.LONG_TYPE_INFO =>
-        val childGen = generateExpression(child)
-        val castCode = if (nullCheck) {
-          s"""
-            |boolean $nullTerm = ${childGen.nullTerm};
-            |$resultTpe $resultTerm;
-            |if ($nullTerm) {
-            |  $resultTerm = null;
-            |} else {
-            |  $resultTerm = new java.util.Date(${childGen.resultTerm});
-            |}
-          """.stripMargin
-        } else {
-          s"""
-            |$resultTpe $resultTerm = new 
java.util.Date(${childGen.resultTerm});
-          """.stripMargin
-        }
-        childGen.code + castCode
-
-      case expressions.Cast(child: Expression, DATE_TYPE_INFO)
-        if child.typeInfo == BasicTypeInfo.STRING_TYPE_INFO =>
-        val childGen = generateExpression(child)
-
-        addDateFormatter()
-        addTimeFormatter()
-        addTimestampFormatter()
-
-        // tries to parse
-        // "2011-05-03 15:51:36.234"
-        // then "2011-05-03"
-        // then "15:51:36"
-        // then "1446473775"
-        val parsedName = freshName("parsed")
-        val parsingCode =
-          s"""
-            |java.util.Date $parsedName = null;
-            |try {
-            |  $parsedName = timestampFormatter.parse(${childGen.resultTerm});
-            |} catch (java.text.ParseException e1) {
-            |  try {
-            |    $parsedName = dateFormatter.parse(${childGen.resultTerm});
-            |  } catch (java.text.ParseException e2) {
-            |    try {
-            |      $parsedName = timeFormatter.parse(${childGen.resultTerm});
-            |    } catch (java.text.ParseException e3) {
-            |      $parsedName = new 
java.util.Date(Long.valueOf(${childGen.resultTerm}));
-            |    }
-            |  }
-            |}
-           """.stripMargin
-
-        val castCode = if (nullCheck) {
-          s"""
-            |boolean $nullTerm = ${childGen.nullTerm};
-            |$resultTpe $resultTerm;
-            |if ($nullTerm) {
-            |  $resultTerm = null;
-            |} else {
-            |  $parsingCode
-            |  $resultTerm = $parsedName;
-            |}
-          """.stripMargin
-        } else {
-          s"""
-            |$parsingCode
-            |$resultTpe $resultTerm = $parsedName;
-          """.stripMargin
-        }
-        childGen.code + castCode
-
-      case expressions.Cast(child: Expression, DATE_TYPE_INFO) =>
-        throw new ExpressionException("Only Long and String can be casted to 
Date.")
-
-      case expressions.Cast(child: Expression, LONG_TYPE_INFO)
-        if child.typeInfo == BasicTypeInfo.DATE_TYPE_INFO =>
-        val childGen = generateExpression(child)
-        val castCode = if (nullCheck) {
-          s"""
-            |boolean $nullTerm = ${childGen.nullTerm};
-            |$resultTpe $resultTerm;
-            |if ($nullTerm) {
-            |  $resultTerm = null;
-            |} else {
-            |  $resultTerm = ${childGen.resultTerm}.getTime();
-            |}
-          """.stripMargin
-        } else {
-          s"""
-            |$resultTpe $resultTerm = ${childGen.resultTerm}.getTime();
-          """.stripMargin
-        }
-        childGen.code + castCode
-
-      case expressions.Cast(child: Expression, tpe: BasicTypeInfo[_])
-        if child.typeInfo == BasicTypeInfo.DATE_TYPE_INFO =>
-        throw new ExpressionException("Date can only be casted to Long or 
String.")
-
-      case expressions.Cast(child: Expression, tpe: BasicTypeInfo[_])
-        if child.typeInfo == BasicTypeInfo.STRING_TYPE_INFO =>
-        val childGen = generateExpression(child)
-        val fromTpe = typeTermForTypeInfoForCast(child.typeInfo)
-        val toTpe = typeTermForTypeInfoForCast(tpe)
-
-        val castCode = if (nullCheck) {
-          s"""
-            |boolean $nullTerm = ${childGen.nullTerm};
-            |$resultTpe $resultTerm =
-            |  
${tpe.getTypeClass.getCanonicalName}.valueOf(${childGen.resultTerm});
-          """.stripMargin
-        } else {
-          s"""
-            |$resultTpe $resultTerm =
-            |  
${tpe.getTypeClass.getCanonicalName}.valueOf(${childGen.resultTerm});
-          """.stripMargin
-        }
-
-        childGen.code + castCode
-
-      case expressions.Cast(child: Expression, tpe: BasicTypeInfo[_])
-          if child.typeInfo.isBasicType =>
-        val childGen = generateExpression(child)
-        val fromTpe = typeTermForTypeInfoForCast(child.typeInfo)
-        val toTpe = typeTermForTypeInfoForCast(tpe)
-        val castCode = if (nullCheck) {
-          s"""
-            |boolean $nullTerm = ${childGen.nullTerm};
-            |$resultTpe $resultTerm;
-            |if ($nullTerm) {
-            |  $resultTerm = null;
-            |} else {
-            |  $resultTerm = ($toTpe)($fromTpe) ${childGen.resultTerm};
-            |}
-          """.stripMargin
-        } else {
-          s"$resultTpe $resultTerm = ($toTpe)($fromTpe) 
${childGen.resultTerm};\n"
-        }
-        childGen.code + castCode
-
-      case ResolvedFieldReference(fieldName, fieldTpe: TypeInformation[_]) =>
-        inputs find { i => i._2.hasField(fieldName)} match {
-          case Some((inputName, inputTpe)) =>
-            val fieldCode = getField(newTermName(inputName), inputTpe, 
fieldName, fieldTpe)
-            if (nullCheck) {
-              s"""
-                |$resultTpe $resultTerm = $fieldCode;
-                |boolean $nullTerm = $resultTerm == null;
-              """.stripMargin
-            } else {
-              s"""$resultTpe $resultTerm = $fieldCode;"""
-            }
-
-          case None => throw new ExpressionException("Could not get accessor 
for " + fieldName
-            + " in inputs " + inputs.mkString(", ") + ".")
-        }
-
-      case GreaterThan(left, right) =>
-        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => s"$leftTerm > $rightTerm"
-        }
-
-      case GreaterThanOrEqual(left, right) =>
-        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => s"$leftTerm >= $rightTerm"
-        }
-
-      case LessThan(left, right) =>
-        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => s"$leftTerm < $rightTerm"
-        }
-
-      case LessThanOrEqual(left, right) =>
-        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => s"$leftTerm <= $rightTerm"
-        }
-
-      case EqualTo(left, right) =>
-        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => s"$leftTerm.equals($rightTerm)"
-        }
-
-      case NotEqualTo(left, right) =>
-        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => s"!($leftTerm.equals($rightTerm))"
-        }
-
-      case And(left, right) =>
-        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => s"$leftTerm && $rightTerm"
-        }
-
-      case Or(left, right) =>
-        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => s"$leftTerm || $rightTerm"
-        }
-
-      case Plus(left, right) =>
-        generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => s"$leftTerm + $rightTerm"
-        }
-
-      case Minus(left, right) =>
-        generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => s"$leftTerm - $rightTerm"
-        }
-
-      case Div(left, right) =>
-        generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => s"$leftTerm / $rightTerm"
-        }
-
-      case Mul(left, right) =>
-        generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => s"$leftTerm * $rightTerm"
-        }
-
-      case Mod(left, right) =>
-        generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => s"$leftTerm % $rightTerm"
-        }
-
-      case UnaryMinus(child) =>
-        val childCode = generateExpression(child)
-        if (nullCheck) {
-          childCode.code +
-            s"""
-              |boolean $nullTerm = ${childCode.nullTerm};
-              |$resultTpe $resultTerm;
-              |if ($nullTerm) {
-              |  $resultTerm = ${defaultPrimitive(child.typeInfo)};
-              |} else {
-              |  $resultTerm = -(${childCode.resultTerm});
-              |}
-            """.stripMargin
-        } else {
-          childCode.code +
-            s"""
-              |$resultTpe $resultTerm = -(${childCode.resultTerm});
-            """.stripMargin
-        }
-
-      case BitwiseAnd(left, right) =>
-        generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => s"(int) $leftTerm & (int) $rightTerm"
-        }
-
-      case BitwiseOr(left, right) =>
-        generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => s"(int) $leftTerm | (int) $rightTerm"
-        }
-
-      case BitwiseXor(left, right) =>
-        generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => s"(int) $leftTerm ^ (int) $rightTerm"
-        }
-
-      case BitwiseNot(child) =>
-        val childCode = generateExpression(child)
-        if (nullCheck) {
-          childCode.code +
-            s"""
-              |boolean $nullTerm = ${childCode.nullTerm};
-              |$resultTpe $resultTerm;
-              |if ($nullTerm) {
-              |  $resultTerm = ${defaultPrimitive(child.typeInfo)};
-              |} else {
-              |  $resultTerm = ~((int) ${childCode.resultTerm});
-              |}
-            """.stripMargin
-        } else {
-          childCode.code +
-            s"""
-              |$resultTpe $resultTerm = ~((int) ${childCode.resultTerm});
-            """.stripMargin
-        }
-
-      case Not(child) =>
-        val childCode = generateExpression(child)
-        if (nullCheck) {
-          childCode.code +
-            s"""
-              |boolean $nullTerm = ${childCode.nullTerm};
-              |$resultTpe $resultTerm;
-              |if ($nullTerm) {
-              |  $resultTerm = ${defaultPrimitive(child.typeInfo)};
-              |} else {
-              |  $resultTerm = !(${childCode.resultTerm});
-              |}
-            """.stripMargin
-        } else {
-          childCode.code +
-            s"""
-              |$resultTpe $resultTerm = !(${childCode.resultTerm});
-            """.stripMargin
-        }
-
-      case IsNull(child) =>
-        val childCode = generateExpression(child)
-        if (nullCheck) {
-          childCode.code +
-            s"""
-              |$resultTpe $resultTerm = ${childCode.nullTerm};
-            """.stripMargin
-        } else {
-          childCode.code +
-            s"""
-              |$resultTpe $resultTerm = (${childCode.resultTerm}) == null;
-            """.stripMargin
-        }
-
-      case IsNotNull(child) =>
-        val childCode = generateExpression(child)
-        if (nullCheck) {
-          childCode.code +
-            s"""
-              |$resultTpe $resultTerm = !${childCode.nullTerm};
-            """.stripMargin
-        } else {
-          childCode.code +
-            s"""
-              |$resultTpe $resultTerm = (${childCode.resultTerm}) != null;
-            """.stripMargin
-        }
-
-      case Abs(child) =>
-        val childCode = generateExpression(child)
-        if (nullCheck) {
-          childCode.code +
-            s"""
-              |boolean $nullTerm = ${childCode.nullTerm};
-              |$resultTpe $resultTerm;
-              |if ($nullTerm) {
-              |  $resultTerm = ${defaultPrimitive(child.typeInfo)};
-              |} else {
-              |  $resultTerm = Math.abs(${childCode.resultTerm});
-              |}
-            """.stripMargin
-        } else {
-          childCode.code +
-            s"""
-              |$resultTpe $resultTerm = Math.abs(${childCode.resultTerm});
-            """.stripMargin
-        }
-
-      case _ => throw new ExpressionException("Could not generate code for 
expression " + expr)
-    }
-
-    GeneratedExpression(code, resultTerm, nullTerm)
-  }
-
-  case class GeneratedExpression(code: String, resultTerm: String, nullTerm: 
String)
-
-  def freshName(name: String): String = {
-    s"$name$$${freshNameCounter.getAndIncrement}"
-  }
-
-  val freshNameCounter = new AtomicInteger
-
-  protected def getField(
-    inputTerm: TermName,
-    inputType: CompositeType[_],
-    fieldName: String,
-    fieldType: TypeInformation[_]): String = {
-    val accessor = fieldAccessorFor(inputType, fieldName)
-    val fieldTpe = typeTermForTypeInfo(fieldType)
-
-    accessor match {
-      case ObjectFieldAccessor(fieldName) =>
-        val fieldTerm = newTermName(fieldName)
-        s"($fieldTpe) $inputTerm.$fieldTerm"
-
-      case ObjectMethodAccessor(methodName) =>
-        val methodTerm = newTermName(methodName)
-        s"($fieldTpe) $inputTerm.$methodTerm()"
-
-      case ProductAccessor(i) =>
-        s"($fieldTpe) $inputTerm.productElement($i)"
-
-    }
-  }
-
-  sealed abstract class FieldAccessor
-
-  case class ObjectFieldAccessor(fieldName: String) extends FieldAccessor
-
-  case class ObjectMethodAccessor(methodName: String) extends FieldAccessor
-
-  case class ProductAccessor(i: Int) extends FieldAccessor
-
-  def fieldAccessorFor(elementType: CompositeType[_], fieldName: String): 
FieldAccessor = {
-    elementType match {
-      case ri: RowTypeInfo =>
-        ProductAccessor(elementType.getFieldIndex(fieldName))
-
-      case cc: CaseClassTypeInfo[_] =>
-        ObjectMethodAccessor(fieldName)
-
-      case javaTup: TupleTypeInfo[_] =>
-        ObjectFieldAccessor(fieldName)
-
-      case pj: PojoTypeInfo[_] =>
-        ObjectFieldAccessor(fieldName)
-
-      case proxy: RenamingProxyTypeInfo[_] =>
-        val underlying = proxy.getUnderlyingType
-        val fieldIndex = proxy.getFieldIndex(fieldName)
-        fieldAccessorFor(underlying, underlying.getFieldNames()(fieldIndex))
-    }
-  }
-
-  protected def defaultPrimitive(tpe: TypeInformation[_]): String = tpe match {
-    case BasicTypeInfo.INT_TYPE_INFO => "-1"
-    case BasicTypeInfo.LONG_TYPE_INFO => "-1"
-    case BasicTypeInfo.SHORT_TYPE_INFO => "-1"
-    case BasicTypeInfo.BYTE_TYPE_INFO => "-1"
-    case BasicTypeInfo.FLOAT_TYPE_INFO => "-1.0f"
-    case BasicTypeInfo.DOUBLE_TYPE_INFO => "-1.0d"
-    case BasicTypeInfo.BOOLEAN_TYPE_INFO => "false"
-    case BasicTypeInfo.STRING_TYPE_INFO => "\"<empty>\""
-    case BasicTypeInfo.CHAR_TYPE_INFO => "'\\0'"
-    case _ => "null"
-  }
-
-  protected def typeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe 
match {
-
-    // From PrimitiveArrayTypeInfo we would get class "int[]", scala 
reflections
-    // does not seem to like this, so we manually give the correct type here.
-    case PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
-    case PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
-    case PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
-    case PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
-    case PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
-    case PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
-    case PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => 
"boolean[]"
-    case PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
-
-    case _ =>
-      tpe.getTypeClass.getCanonicalName
-
-  }
-
-  // when casting we first need to unbox Primitives, for example,
-  // float a = 1.0f;
-  // byte b = (byte) a;
-  // works, but for boxed types we need this:
-  // Float a = 1.0f;
-  // Byte b = (byte)(float) a;
-  protected def typeTermForTypeInfoForCast(tpe: TypeInformation[_]): String = 
tpe match {
-
-    case BasicTypeInfo.INT_TYPE_INFO => "int"
-    case BasicTypeInfo.LONG_TYPE_INFO => "long"
-    case BasicTypeInfo.SHORT_TYPE_INFO => "short"
-    case BasicTypeInfo.BYTE_TYPE_INFO => "byte"
-    case BasicTypeInfo.FLOAT_TYPE_INFO => "float"
-    case BasicTypeInfo.DOUBLE_TYPE_INFO => "double"
-    case BasicTypeInfo.BOOLEAN_TYPE_INFO => "boolean"
-    case BasicTypeInfo.CHAR_TYPE_INFO => "char"
-
-    // From PrimitiveArrayTypeInfo we would get class "int[]", scala 
reflections
-    // does not seem to like this, so we manually give the correct type here.
-    case PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
-    case PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
-    case PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
-    case PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
-    case PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
-    case PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
-    case PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => 
"boolean[]"
-    case PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
-
-    case _ =>
-      tpe.getTypeClass.getCanonicalName
-
-  }
-
-  def addDateFormatter(): Unit = {
-    reusableMemberStatements.add(s"""
-    |java.text.SimpleDateFormat dateFormatter =
-    |  new java.text.SimpleDateFormat("yyyy-MM-dd");
-    |""".stripMargin)
-
-    reusableInitStatements.add(s"""
-    |dateFormatter.setTimeZone(config.getTimeZone());
-    |""".stripMargin)
-  }
-
-  def addTimeFormatter(): Unit = {
-    reusableMemberStatements.add(s"""
-    |java.text.SimpleDateFormat timeFormatter =
-    |  new java.text.SimpleDateFormat("HH:mm:ss");
-    |""".stripMargin)
-
-    reusableInitStatements.add(s"""
-    |timeFormatter.setTimeZone(config.getTimeZone());
-    |""".stripMargin)
-  }
-
-  def addTimestampFormatter(): Unit = {
-    reusableMemberStatements.add(s"""
-    |java.text.SimpleDateFormat timestampFormatter =
-    |  new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
-    |""".stripMargin)
-
-    reusableInitStatements.add(s"""
-    |timestampFormatter.setTimeZone(config.getTimeZone());
-    |""".stripMargin)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala
deleted file mode 100644
index 50b8c69..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.codegen
-
-import java.io.StringReader
-
-import org.apache.flink.api.common.functions.FilterFunction
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.codegen.Indenter._
-import org.apache.flink.api.table.expressions.Expression
-import org.slf4j.LoggerFactory
-
-/**
- * Code generator for a unary predicate, i.e. a Filter.
- */
-class GenerateFilter[T](
-    inputType: CompositeType[T],
-    predicate: Expression,
-    cl: ClassLoader,
-    config: TableConfig) extends ExpressionCodeGenerator[FilterFunction[T]](
-      Seq(("in0", inputType)),
-      cl = cl,
-      config) {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  override protected def generateInternal(): FilterFunction[T] = {
-    val pred = generateExpression(predicate)
-
-    val tpe = typeTermForTypeInfo(inputType)
-
-    val generatedName = freshName("GeneratedFilter")
-
-    // Janino does not support generics, so we need to cast by hand
-    val code = if (nullCheck) {
-      j"""
-        public class $generatedName
-            implements 
org.apache.flink.api.common.functions.FilterFunction<$tpe> {
-
-          org.apache.flink.api.table.TableConfig config = null;
-
-          public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
-            this.config = config;
-          }
-
-          public boolean filter(Object _in0) {
-            $tpe in0 = ($tpe) _in0;
-            ${pred.code}
-            if (${pred.nullTerm}) {
-              return false;
-            } else {
-              return ${pred.resultTerm};
-            }
-          }
-        }
-      """
-    } else {
-      j"""
-        public class $generatedName
-            implements 
org.apache.flink.api.common.functions.FilterFunction<$tpe> {
-
-          org.apache.flink.api.table.TableConfig config = null;
-
-          public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
-            this.config = config;
-          }
-
-          public boolean filter(Object _in0) {
-            $tpe in0 = ($tpe) _in0;
-            ${pred.code}
-            return ${pred.resultTerm};
-          }
-        }
-      """
-    }
-
-    LOG.debug(s"""Generated unary predicate "$predicate":\n$code""")
-    compiler.cook(new StringReader(code))
-    val clazz = compiler.getClassLoader().loadClass(generatedName)
-    val constructor = clazz.getConstructor(classOf[TableConfig])
-    constructor.newInstance(config).asInstanceOf[FilterFunction[T]]
-  }
-}

Reply via email to