http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala
new file mode 100644
index 0000000..3b5459b
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.typeinfo
+
+import org.apache.flink.api.common.operators.Operator
+import org.apache.flink.api.java.operators.SingleInputOperator
+import org.apache.flink.api.java.{DataSet => JavaDataSet}
+
+/**
+ * This is a logical operator that can hold a [[RenamingProxyTypeInfo]] for 
renaming some
+ * fields of a [[org.apache.flink.api.common.typeutils.CompositeType]]. At 
runtime this
+ * disappears since the translation methods simply returns the input.
+ */
+class RenameOperator[T](
+    input: JavaDataSet[T],
+    renamingTypeInformation: RenamingProxyTypeInfo[T])
+  extends SingleInputOperator[T, T, RenameOperator[T]](input, 
renamingTypeInformation) {
+
+  override protected def translateToDataFlow(
+      input: Operator[T]): Operator[T] = input
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
new file mode 100644
index 0000000..dd598ab
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.typeinfo
+
+import java.util
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import 
org.apache.flink.api.common.typeutils.CompositeType.{TypeComparatorBuilder,
+FlatFieldDescriptor}
+import org.apache.flink.api.common.typeutils.{CompositeType, TypeSerializer}
+
+/**
+ * A TypeInformation that is used to rename fields of an underlying 
CompositeType. This
+ * allows the system to translate "as" Table API operations to a 
[[RenameOperator]]
+ * that does not get translated to a runtime operator.
+ */
+class RenamingProxyTypeInfo[T](
+    val tpe: CompositeType[T],
+    val fieldNames: Array[String])
+  extends CompositeType[T](tpe.getTypeClass) {
+
+  def getUnderlyingType: CompositeType[T] = tpe
+
+  if (tpe.getArity != fieldNames.length) {
+    throw new IllegalArgumentException(s"Number of field names 
'${fieldNames.mkString(",")}' and " +
+      s"number of fields in underlying type $tpe do not match.")
+  }
+
+  if (fieldNames.toSet.size != fieldNames.length) {
+    throw new IllegalArgumentException(s"New field names must be unique. " +
+      s"Names: ${fieldNames.mkString(",")}.")
+  }
+
+  override def getFieldIndex(fieldName: String): Int = {
+    val result = fieldNames.indexOf(fieldName)
+    if (result != fieldNames.lastIndexOf(fieldName)) {
+      -2
+    } else {
+      result
+    }
+  }
+  override def getFieldNames: Array[String] = fieldNames
+
+  override def isBasicType: Boolean = tpe.isBasicType
+
+  override def createSerializer(executionConfig: ExecutionConfig): 
TypeSerializer[T] =
+    tpe.createSerializer(executionConfig)
+
+  override def getArity: Int = tpe.getArity
+
+  override def isKeyType: Boolean = tpe.isKeyType
+
+  override def getTypeClass: Class[T] = tpe.getTypeClass
+
+  override def getGenericParameters: java.util.List[TypeInformation[_]] = 
tpe.getGenericParameters
+
+  override def isTupleType: Boolean = tpe.isTupleType
+
+  override def toString = {
+    s"RenamingType(type: ${tpe.getTypeClass.getSimpleName}; " +
+      s"fields: ${fieldNames.mkString(", ")})"
+  }
+
+  override def getTypeAt[X](pos: Int): TypeInformation[X] = tpe.getTypeAt(pos)
+
+  override def getTotalFields: Int = tpe.getTotalFields
+
+  override def createComparator(
+        logicalKeyFields: Array[Int],
+        orders: Array[Boolean],
+        logicalFieldOffset: Int,
+        executionConfig: ExecutionConfig) =
+    tpe.createComparator(logicalKeyFields, orders, logicalFieldOffset, 
executionConfig)
+
+  override def getFlatFields(
+      fieldExpression: String,
+      offset: Int,
+      result: util.List[FlatFieldDescriptor]): Unit = {
+    tpe.getFlatFields(fieldExpression, offset, result)
+  }
+
+  override def getTypeAt[X](fieldExpression: String): TypeInformation[X] = {
+    tpe.getTypeAt(fieldExpression)
+  }
+
+  override protected def createTypeComparatorBuilder(): 
TypeComparatorBuilder[T] = {
+    throw new RuntimeException("This method should never be called because 
createComparator is " +
+      "overwritten.")
+  }
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case renamingProxy: RenamingProxyTypeInfo[_] =>
+        renamingProxy.canEqual(this) &&
+        tpe.equals(renamingProxy.tpe) &&
+        fieldNames.sameElements(renamingProxy.fieldNames)
+      case _ => false
+    }
+  }
+
+  override def hashCode(): Int = {
+    31 * tpe.hashCode() + 
util.Arrays.hashCode(fieldNames.asInstanceOf[Array[AnyRef]])
+  }
+
+  override def canEqual(obj: Any): Boolean = {
+    obj.isInstanceOf[RenamingProxyTypeInfo[_]]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
new file mode 100644
index 0000000..5e9613d
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.typeinfo
+
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.core.memory.{DataOutputView, DataInputView}
+
+/**
+ * Serializer for [[Row]].
+ */
+class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]])
+  extends TypeSerializer[Row] {
+
+  override def isImmutableType: Boolean = false
+
+  override def getLength: Int = -1
+
+  override def duplicate = this
+
+  override def createInstance: Row = {
+    new Row(fieldSerializers.length)
+  }
+
+  override def copy(from: Row, reuse: Row): Row = {
+    val len = fieldSerializers.length
+
+    if (from.productArity != len) {
+      throw new RuntimeException("Row arity of reuse and from do not match.")
+    }
+    var i = 0
+    while (i < len) {
+      val reuseField = reuse.productElement(i)
+      val fromField = from.productElement(i).asInstanceOf[AnyRef]
+      val copy = fieldSerializers(i).copy(fromField, reuseField)
+      reuse.setField(i, copy)
+      i += 1
+    }
+    reuse
+  }
+
+  override def copy(from: Row): Row = {
+    val len = fieldSerializers.length
+
+    if (from.productArity != len) {
+      throw new RuntimeException("Row arity of reuse and from do not match.")
+    }
+    val result = new Row(len)
+    var i = 0
+    while (i < len) {
+      val fromField = from.productElement(i).asInstanceOf[AnyRef]
+      val copy = fieldSerializers(i).copy(fromField)
+      result.setField(i, copy)
+      i += 1
+    }
+    result
+  }
+
+  override def serialize(value: Row, target: DataOutputView) {
+    val len = fieldSerializers.length
+    var i = 0
+    while (i < len) {
+      val serializer = fieldSerializers(i)
+      serializer.serialize(value.productElement(i), target)
+      i += 1
+    }
+  }
+
+  override def deserialize(reuse: Row, source: DataInputView): Row = {
+    val len = fieldSerializers.length
+
+    if (reuse.productArity != len) {
+      throw new RuntimeException("Row arity of reuse and fields do not match.")
+    }
+
+    var i = 0
+    while (i < len) {
+      val field = reuse.productElement(i).asInstanceOf[AnyRef]
+      reuse.setField(i, fieldSerializers(i).deserialize(field, source))
+      i += 1
+    }
+    reuse
+  }
+
+  override def deserialize(source: DataInputView): Row = {
+    val len = fieldSerializers.length
+
+    val result = new Row(len)
+    var i = 0
+    while (i < len) {
+      result.setField(i, fieldSerializers(i).deserialize(source))
+      i += 1
+    }
+    result
+  }
+
+  override def copy(source: DataInputView, target: DataOutputView): Unit = {
+    val len = fieldSerializers.length
+    var i = 0
+    while (i < len) {
+      fieldSerializers(i).copy(source, target)
+      i += 1
+    }
+  }
+
+  override def equals(any: scala.Any): Boolean = {
+    any match {
+      case otherRS: RowSerializer =>
+        otherRS.canEqual(this) &&
+          fieldSerializers.sameElements(otherRS.fieldSerializers)
+      case _ => false
+    }
+  }
+
+  override def canEqual(obj: scala.Any): Boolean = {
+    obj.isInstanceOf[RowSerializer]
+  }
+
+  override def hashCode(): Int = {
+    java.util.Arrays.hashCode(fieldSerializers.asInstanceOf[Array[AnyRef]])
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
new file mode 100644
index 0000000..db3c881
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.typeinfo
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo}
+
+/**
+ * TypeInformation for [[Row]].
+ */
+class RowTypeInfo(
+    fieldTypes: Seq[TypeInformation[_]],
+    fieldNames: Seq[String])
+  extends CaseClassTypeInfo[Row](classOf[Row], Array(), fieldTypes, 
fieldNames) {
+
+  def this(fields: Seq[Expression]) = this(fields.map(_.typeInfo), 
fields.map(_.name))
+
+  if (fieldNames.toSet.size != fieldNames.size) {
+    throw new IllegalArgumentException("Field names must be unique.")
+  }
+
+  override def createSerializer(executionConfig: ExecutionConfig): 
TypeSerializer[Row] = {
+    val fieldSerializers: Array[TypeSerializer[Any]] = new 
Array[TypeSerializer[Any]](getArity)
+    for (i <- 0 until getArity) {
+      fieldSerializers(i) = this.types(i).createSerializer(executionConfig)
+        .asInstanceOf[TypeSerializer[Any]]
+    }
+
+    new RowSerializer(fieldSerializers)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
new file mode 100644
index 0000000..dda6265
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.api.common.functions.GroupReduceFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.examples.java.graph.util.PageRankData
+import org.apache.flink.util.Collector
+
+import _root_.scala.collection.JavaConverters._
+
+/**
+* A basic implementation of the Page Rank algorithm using a bulk iteration.
+*
+* This implementation requires a set of pages and a set of directed links as 
input and works as
+* follows.
+*
+* In each iteration, the rank of every page is evenly distributed to all pages 
it points to. Each
+* page collects the partial ranks of all pages that point to it, sums them up, 
and applies a
+* dampening factor to the sum. The result is the new rank of the page. A new 
iteration is started
+* with the new ranks of all pages. This implementation terminates after a 
fixed number of
+* iterations. This is the Wikipedia entry for the
+* [[http://en.wikipedia.org/wiki/Page_rank Page Rank algorithm]]
+*
+* Input files are plain text files and must be formatted as follows:
+*
+*  - Pages represented as an (long) ID separated by new-line characters.
+*    For example `"1\n2\n12\n42\n63"` gives five pages with IDs 1, 2, 12, 42, 
and 63.
+*  - Links are represented as pairs of page IDs which are separated by space  
characters. Links
+*    are separated by new-line characters.
+*    For example `"1 2\n2 12\n1 12\n42 63"` gives four (directed) links 
(1)->(2), (2)->(12),
+*    (1)->(12), and (42)->(63). For this simple implementation it is required 
that each page has
+*    at least one incoming and one outgoing link (a page can point to itself).
+*
+* Usage:
+* {{{
+*   PageRankBasic <pages path> <links path> <output path> <num pages> <num 
iterations>
+* }}}
+*
+* If no parameters are provided, the program is run with default data from
+* [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations.
+*
+* This example shows how to use:
+*
+*  - Bulk Iterations
+*  - Table API expressions
+*/
+object PageRankTable {
+
+  private final val DAMPENING_FACTOR: Double = 0.85
+  private final val EPSILON: Double = 0.0001
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    // set up execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    // read input data
+    val pagesWithRanks = getPagesDataSet(env).map { p => (p, 1.0 / numPages) }
+      .as('pageId, 'rank)
+
+    val links = getLinksDataSet(env)
+
+    // build adjacency list from link input
+    val adjacencyLists = links
+      .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, 
AdjacencyList] {
+
+        override def reduce(
+            values: _root_.java.lang.Iterable[Link],
+            out: Collector[AdjacencyList]): Unit = {
+          var outputId = -1L
+          val outputList = values.asScala map { t => outputId = t.sourceId; 
t.targetId }
+          out.collect(new AdjacencyList(outputId, outputList.toArray))
+        }
+
+      }).as('sourceId, 'targetIds)
+
+    // start iteration
+    val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
+      currentRanks =>
+        val newRanks = currentRanks.toTable
+          // distribute ranks to target pages
+          .join(adjacencyLists).where('pageId === 'sourceId)
+          .select('rank, 'targetIds).toDataSet[RankOutput]
+          .flatMap {
+            (in, out: Collector[(Long, Double)]) =>
+              val targets = in.targetIds
+              val len = targets.length
+              targets foreach { t => out.collect((t, in.rank / len )) }
+          }
+          .as('pageId, 'rank)
+          // collect ranks and sum them up
+          .groupBy('pageId).select('pageId, 'rank.sum as 'rank)
+          // apply dampening factor
+          .select(
+            'pageId,
+            ('rank * DAMPENING_FACTOR) + (Literal(1) - DAMPENING_FACTOR) / 
numPages as 'rank)
+
+
+        val termination = currentRanks.toTable
+          .as('curId, 'curRank).join(newRanks.as('newId, 'newRank))
+          .where('curId === 'newId && ('curRank - 'newRank).abs > EPSILON)
+
+        (newRanks, termination)
+    }
+
+    val result = finalRanks
+
+    // emit result
+    if (fileOutput) {
+      result.writeAsCsv(outputPath, "\n", " ")
+      // execute program
+      env.execute("Expression PageRank Example")
+    } else {
+      // execute program and print result
+      result.print()
+    }
+  }
+
+  // *************************************************************************
+  //     USER TYPES
+  // *************************************************************************
+
+  case class Link(sourceId: Long, targetId: Long)
+
+  case class Page(pageId: Long, rank: Double)
+
+  case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
+
+  case class RankOutput(rank: Double, targetIds: Array[Long])
+
+  // *************************************************************************
+  //     UTIL METHODS
+  // *************************************************************************
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      fileOutput = true
+      if (args.length == 5) {
+        pagesInputPath = args(0)
+        linksInputPath = args(1)
+        outputPath = args(2)
+        numPages = args(3).toLong
+        maxIterations = args(4).toInt
+      } else {
+        System.err.println("Usage: PageRankBasic <pages path> <links path> 
<output path> <num " +
+          "pages> <num iterations>")
+        false
+      }
+    } else {
+      System.out.println("Executing PageRank Basic example with default 
parameters and built-in " +
+        "default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of 
input files.")
+      System.out.println("  Usage: PageRankBasic <pages path> <links path> 
<output path> <num " +
+        "pages> <num iterations>")
+
+      numPages = PageRankData.getNumberOfPages
+    }
+    true
+  }
+
+  private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = {
+    if (fileOutput) {
+      env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", 
lineDelimiter = "\n")
+        .map(x => x._1)
+    } else {
+      env.generateSequence(1, 15)
+    }
+  }
+
+  private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = {
+    if (fileOutput) {
+      env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ",
+        includedFields = Array(0, 1))
+    } else {
+      val edges = PageRankData.EDGES.map { case Array(v1, v2) => 
Link(v1.asInstanceOf[Long],
+        v2.asInstanceOf[Long])}
+      env.fromCollection(edges)
+    }
+  }
+
+  private var fileOutput: Boolean = false
+  private var pagesInputPath: String = null
+  private var linksInputPath: String = null
+  private var outputPath: String = null
+  private var numPages: Double = 0
+  private var maxIterations: Int = 10
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
new file mode 100644
index 0000000..63dddc9
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala
+
+import org.apache.flink.streaming.api.scala._
+
+import org.apache.flink.api.scala.table._
+
+import scala.Stream._
+import scala.math._
+import scala.language.postfixOps
+import scala.util.Random
+
+/**
+ * Simple example for demonstrating the use of the Table API with Flink 
Streaming.
+ */
+object StreamingTableFilter {
+
+  case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long) 
extends Serializable
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val cars = genCarStream().toTable
+      .filter('carId === 0)
+      .select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 
'time)
+      .toDataStream[CarEvent]
+
+    cars.print()
+
+    
StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing")
+
+  }
+
+  def genCarStream(): DataStream[CarEvent] = {
+
+    def nextSpeed(carEvent : CarEvent) : CarEvent =
+    {
+      val next =
+        if (Random.nextBoolean()) min(100, carEvent.speed + 5) else max(0, 
carEvent.speed - 5)
+      CarEvent(carEvent.carId, next, carEvent.distance + 
next/3.6d,System.currentTimeMillis)
+    }
+    def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] =
+    {
+      Thread.sleep(1000)
+      speeds.append(carStream(speeds.map(nextSpeed)))
+    }
+    carStream(range(0, 
numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis())))
+  }
+
+  def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      if (args.length == 3) {
+        numOfCars = args(0).toInt
+        evictionSec = args(1).toInt
+        triggerMeters = args(2).toDouble
+        true
+      }
+      else {
+        System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> 
<triggerMeters>")
+        false
+      }
+    }else{
+      true
+    }
+  }
+
+  var numOfCars = 2
+  var evictionSec = 10
+  var triggerMeters = 50d
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
new file mode 100644
index 0000000..f527a3c
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+
+/**
+ * This program implements a modified version of the TPC-H query 3. The
+ * example demonstrates how to assign names to fields by extending the Tuple 
class.
+ * The original query can be found at
+ * 
[http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
+ * (page 29).
+ *
+ * This program implements the following SQL equivalent:
+ *
+ * {{{
+ * SELECT 
+ *      l_orderkey, 
+ *      SUM(l_extendedprice*(1-l_discount)) AS revenue,
+ *      o_orderdate, 
+ *      o_shippriority 
+ * FROM customer, 
+ *      orders, 
+ *      lineitem 
+ * WHERE
+ *      c_mktsegment = '[SEGMENT]' 
+ *      AND c_custkey = o_custkey
+ *      AND l_orderkey = o_orderkey
+ *      AND o_orderdate < date '[DATE]'
+ *      AND l_shipdate > date '[DATE]'
+ * GROUP BY
+ *      l_orderkey, 
+ *      o_orderdate, 
+ *      o_shippriority;
+ * }}}
+ *
+ * Compared to the original TPC-H query this version does not sort the result 
by revenue
+ * and orderdate.
+ *
+ * Input files are plain text CSV files using the pipe character ('|') as 
field separator 
+ * as generated by the TPC-H data generator which is available at 
+ * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
+ *
+ * Usage: 
+ * {{{
+ * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv 
path> <result path>
+ * }}}
+ *  
+ * This example shows how to use:
+ *  - Table API expressions
+ *
+ */
+object TPCHQuery3Table {
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    // set filter date
+    val dateFormat = new _root_.java.text.SimpleDateFormat("yyyy-MM-dd")
+    val date = dateFormat.parse("1995-03-12")
+    
+    // get execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val lineitems = getLineitemDataSet(env)
+      .filter( l => dateFormat.parse(l.shipDate).after(date) )
+      .as('id, 'extdPrice, 'discount, 'shipDate)
+
+    val customers = getCustomerDataSet(env)
+      .as('id, 'mktSegment)
+      .filter( 'mktSegment === "AUTOMOBILE" )
+
+    val orders = getOrdersDataSet(env)
+      .filter( o => dateFormat.parse(o.orderDate).before(date) )
+      .as('orderId, 'custId, 'orderDate, 'shipPrio)
+
+    val items =
+      orders.join(customers)
+        .where('custId === 'id)
+        .select('orderId, 'orderDate, 'shipPrio)
+      .join(lineitems)
+        .where('orderId === 'id)
+        .select(
+          'orderId,
+          'extdPrice * (Literal(1.0f) - 'discount) as 'revenue,
+          'orderDate,
+          'shipPrio)
+
+    val result = items
+      .groupBy('orderId, 'orderDate, 'shipPrio)
+      .select('orderId, 'revenue.sum, 'orderDate, 'shipPrio)
+
+    // emit result
+    result.writeAsCsv(outputPath, "\n", "|")
+
+    // execute program
+    env.execute("Scala TPCH Query 3 (Expression) Example")
+  }
+  
+  // *************************************************************************
+  //     USER DATA TYPES
+  // *************************************************************************
+  
+  case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: 
String)
+  case class Customer(id: Long, mktSegment: String)
+  case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: 
Long)
+
+  // *************************************************************************
+  //     UTIL METHODS
+  // *************************************************************************
+  
+  private var lineitemPath: String = null
+  private var customerPath: String = null
+  private var ordersPath: String = null
+  private var outputPath: String = null
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length == 4) {
+      lineitemPath = args(0)
+      customerPath = args(1)
+      ordersPath = args(2)
+      outputPath = args(3)
+      true
+    } else {
+      System.err.println("This program expects data from the TPC-H benchmark 
as input data.\n" +
+          " Due to legal restrictions, we can not ship generated data.\n" +
+          " You can find the TPC-H data generator at 
http://www.tpc.org/tpch/.\n"; +
+          " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" + 
+                             "<orders-csv path> <result path>");
+      false
+    }
+  }
+  
+  private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] 
= {
+    env.readCsvFile[Lineitem](
+        lineitemPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 5, 6, 10) )
+  }
+
+  private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] 
= {
+    env.readCsvFile[Customer](
+        customerPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 6) )
+  }
+  
+  private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
+    env.readCsvFile[Order](
+        ordersPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 1, 4, 7) )
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
new file mode 100644
index 0000000..cac9590
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+
+/**
+ * Simple example for demonstrating the use of the Table API for a Word Count.
+ */
+object WordCountTable {
+
+  case class WC(word: String, count: Int)
+
+  def main(args: Array[String]): Unit = {
+
+    // set up execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
+    val expr = input.toTable
+    val result = expr
+      .groupBy('word)
+      .select('word, 'count.sum as 'count)
+      .toDataSet[WC]
+
+    result.print()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/Node.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/Node.java 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/Node.java
new file mode 100644
index 0000000..9152260
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/Node.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.explain;
+
+import java.util.List;
+
+public class Node {
+       private int id;
+       private String type;
+       private String pact;
+       private String contents;
+       private int parallelism;
+       private String driver_strategy;
+       private List<Predecessors> predecessors;
+       private List<Global_properties> global_properties;
+       private List<LocalProperty> local_properties;
+       private List<Estimates> estimates;
+       private List<Costs> costs;
+       private List<Compiler_hints> compiler_hints;
+
+       public int getId() {
+               return id;
+       }
+       public String getType() {
+               return type;
+       }
+       public String getPact() {
+               return pact;
+       }
+       public String getContents() {
+               return contents;
+       }
+       public int getParallelism() {
+               return parallelism;
+       }
+       public String getDriver_strategy() {
+               return driver_strategy;
+       }
+       public List<Predecessors> getPredecessors() {
+               return predecessors;
+       }
+       public List<Global_properties> getGlobal_properties() {
+               return global_properties;
+       }
+       public List<LocalProperty> getLocal_properties() {
+               return local_properties;
+       }
+       public List<Estimates> getEstimates() {
+               return estimates;
+       }
+       public List<Costs> getCosts() {
+               return costs;
+       }
+       public List<Compiler_hints> getCompiler_hints() {
+               return compiler_hints;
+       }
+}
+
+class Predecessors {
+       private String ship_strategy;
+       private String exchange_mode;
+
+       public String getShip_strategy() {
+               return ship_strategy;
+       }
+       public String getExchange_mode() {
+               return exchange_mode;
+       }
+}
+
+class Global_properties {
+       private String name;
+       private String value;
+
+       public String getValue() {
+               return value;
+       }
+       public String getName() {
+               return name;
+       }
+}
+
+class LocalProperty {
+       private String name;
+       private String value;
+
+       public String getValue() {
+               return value;
+       }
+       public String getName() {
+               return name;
+       }
+}
+
+class Estimates {
+       private String name;
+       private String value;
+
+       public String getValue() {
+               return value;
+       }
+       public String getName() {
+               return name;
+       }
+}
+
+class Costs {
+       private String name;
+       private String value;
+
+       public String getValue() {
+               return value;
+       }
+       public String getName() {
+               return name;
+       }
+}
+
+class Compiler_hints {
+       private String name;
+       private String value;
+
+       public String getValue() {
+               return value;
+       }
+       public String getName() {
+               return name;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/PlanJsonParser.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/PlanJsonParser.java
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/PlanJsonParser.java
new file mode 100644
index 0000000..31a7cd68
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/PlanJsonParser.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.explain;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+public class PlanJsonParser {
+
+       public static String getSqlExecutionPlan(String t, Boolean extended) 
throws Exception {
+               ObjectMapper objectMapper = new ObjectMapper();
+
+               //not every node is same, ignore the unknown field
+               
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+
+               PlanTree tree = objectMapper.readValue(t, PlanTree.class);
+               LinkedHashMap<String, Integer> map = new LinkedHashMap<>();
+               StringWriter sw = new StringWriter();
+               PrintWriter pw = new PrintWriter(sw);
+               int tabCount = 0;
+
+               for (int index = 0; index < tree.getNodes().size(); index++) {
+                       Node tempNode = tree.getNodes().get(index);
+
+                       //input with operation such as join or union is 
coordinate, keep the same indent 
+                       if ((tempNode.getPact().equals("Data Source")) && 
(map.containsKey(tempNode.getPact()))) {
+                               tabCount = map.get(tempNode.getPact());
+                       }
+                       else {
+                               map.put(tempNode.getPact(), tabCount);
+                       }
+
+                       printTab(tabCount, pw);
+                       pw.print("Stage " + tempNode.getId() + " : " + 
tempNode.getPact() + "\n");
+
+                       printTab(tabCount + 1, pw);
+                       String content = tempNode.getContents();
+
+                       //drop the hashcode of object instance
+                       int dele = tempNode.getContents().indexOf("@");
+                       if (dele > -1) content = 
tempNode.getContents().substring(0, dele);
+                       
+                       //replace with certain content if node is dataSource to 
pass
+                       //unit tests, because java and scala use different api 
to
+                       //get input element
+                       if (tempNode.getPact().equals("Data Source"))
+                               content = "collect elements with 
CollectionInputFormat";
+                       pw.print("content : " + content + "\n");
+
+                       List<Predecessors> predecessors = 
tempNode.getPredecessors();
+                       if (predecessors != null) {
+                               printTab(tabCount + 1, pw);
+                               pw.print("ship_strategy : " + 
predecessors.get(0).getShip_strategy() + "\n");
+
+                               printTab(tabCount + 1, pw);
+                               pw.print("exchange_mode : " + 
predecessors.get(0).getExchange_mode() + "\n");
+                       }
+
+                       if (tempNode.getDriver_strategy() != null) {
+                               printTab(tabCount + 1, pw);
+                               pw.print("driver_strategy : " + 
tempNode.getDriver_strategy() + "\n");
+                       }
+
+                       printTab(tabCount + 1, pw);
+                       
pw.print(tempNode.getGlobal_properties().get(0).getName() + " : "
+                                       + 
tempNode.getGlobal_properties().get(0).getValue() + "\n");
+
+                       if (extended) {
+                               List<Global_properties> globalProperties = 
tempNode.getGlobal_properties();
+                               for (int i = 1; i < globalProperties.size(); 
i++) {
+                                       printTab(tabCount + 1, pw);
+                                       
pw.print(globalProperties.get(i).getName() + " : "
+                                       + globalProperties.get(i).getValue() + 
"\n");
+                               }
+
+                               List<LocalProperty> localProperties = 
tempNode.getLocal_properties();
+                               for (int i = 0; i < localProperties.size(); 
i++) {
+                                       printTab(tabCount + 1, pw);
+                                       
pw.print(localProperties.get(i).getName() + " : "
+                                       + localProperties.get(i).getValue() + 
"\n");
+                               }
+
+                               List<Estimates> estimates = 
tempNode.getEstimates();
+                               for (int i = 0; i < estimates.size(); i++) {
+                                       printTab(tabCount + 1, pw);
+                                       pw.print(estimates.get(i).getName() + " 
: "
+                                       + estimates.get(i).getValue() + "\n");
+                               }
+
+                               List<Costs> costs = tempNode.getCosts();
+                               for (int i = 0; i < costs.size(); i++) {
+                                       printTab(tabCount + 1, pw);
+                                       pw.print(costs.get(i).getName() + " : "
+                                       + costs.get(i).getValue() + "\n");
+                               }
+
+                               List<Compiler_hints> compilerHintses = 
tempNode.getCompiler_hints();
+                               for (int i = 0; i < compilerHintses.size(); 
i++) {
+                                       printTab(tabCount + 1, pw);
+                                       
pw.print(compilerHintses.get(i).getName() + " : "
+                                       + compilerHintses.get(i).getValue() + 
"\n");
+                               }
+                       }
+                       tabCount++;
+                       pw.print("\n");
+               }
+               pw.close();
+               return sw.toString();
+       }
+
+       private static void printTab(int tabCount, PrintWriter pw) {
+               for (int i = 0; i < tabCount; i++)
+                       pw.print("\t");
+       }
+}
+
+class PlanTree {
+       private List<Node> nodes;
+
+       public List<Node> getNodes() {
+               return nodes;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
new file mode 100644
index 0000000..bdebfb1
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.table.test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.table.ExpressionException;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class AggregationsITCase extends MultipleProgramsTestBase {
+
+
+       public AggregationsITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Test
+       public void testAggregationTypes() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               Table table = 
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
+
+               Table result = table.select("f0.sum, f0.min, f0.max, f0.count, 
f0.avg");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "231,1,21,21,11";
+               compareResultAsText(results, expected);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testAggregationOnNonExistingField() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               Table table =
+                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
+
+               Table result =
+                               table.select("foo.avg");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testWorkingAggregationDataTypes() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, 
String>> input =
+                               env.fromElements(
+                                               new Tuple7<>((byte) 1, (short) 
1, 1, 1L, 1.0f, 1.0d, "Hello"),
+                                               new Tuple7<>((byte) 2, (short) 
2, 2, 2L, 2.0f, 2.0d, "Ciao"));
+
+               Table table =
+                               tableEnv.fromDataSet(input);
+
+               Table result =
+                               table.select("f0.avg, f1.avg, f2.avg, f3.avg, 
f4.avg, f5.avg, f6.count");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "1,1,1,1,1.5,1.5,2";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testAggregationWithArithmetic() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSource<Tuple2<Float, String>> input =
+                               env.fromElements(
+                                               new Tuple2<>(1f, "Hello"),
+                                               new Tuple2<>(2f, "Ciao"));
+
+               Table table =
+                               tableEnv.fromDataSet(input);
+
+               Table result =
+                               table.select("(f0 + 2).avg + 2, f1.count + \" 
THE COUNT\"");
+
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "5.5,2 THE COUNT";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testAggregationWithTwoCount() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSource<Tuple2<Float, String>> input =
+                       env.fromElements(
+                               new Tuple2<>(1f, "Hello"),
+                               new Tuple2<>(2f, "Ciao"));
+
+               Table table =
+                       tableEnv.fromDataSet(input);
+
+               Table result =
+                       table.select("f0.count, f1.count");
+
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "2,2";
+               compareResultAsText(results, expected);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testNonWorkingDataTypes() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSource<Tuple2<Float, String>> input = env.fromElements(new 
Tuple2<>(1f, "Hello"));
+
+               Table table =
+                               tableEnv.fromDataSet(input);
+
+               Table result =
+                               table.select("f1.sum");
+
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testNoNestedAggregation() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSource<Tuple2<Float, String>> input = env.fromElements(new 
Tuple2<>(1f, "Hello"));
+
+               Table table =
+                               tableEnv.fromDataSet(input);
+
+               Table result =
+                               table.select("f0.sum.sum");
+
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
+       }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
new file mode 100644
index 0000000..f6ab54e
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import org.apache.flink.api.table.ExpressionException;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class AsITCase extends MultipleProgramsTestBase {
+
+
+       public AsITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Test
+       public void testAs() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               Table table =
+                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c");
+
+               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" + "4,3,Hello world, " +
+                               "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
+                               "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+                               "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+                               "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+                               "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
+               compareResultAsText(results, expected);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testAsWithToFewFields() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               Table table =
+                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b");
+
+               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testAsWithToManyFields() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               Table table =
+                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
+
+               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testAsWithAmbiguousFields() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               Table table =
+                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
+
+               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testAsWithNonFieldReference1() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               Table table =
+                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
+
+               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testAsWithNonFieldReference2() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               Table table =
+                               
tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b," +
+                                               " c");
+
+               DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
new file mode 100644
index 0000000..7e9e3dc
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class CastingITCase extends MultipleProgramsTestBase {
+
+
+       public CastingITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Test
+       public void testAutoCastToString() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, 
String>> input =
+                               env.fromElements(new Tuple7<>((byte) 1, (short) 
1, 1, 1L, 1.0f, 1.0d, "Hello"));
+
+               Table table =
+                               tableEnv.fromDataSet(input);
+
+               Table result = table.select(
+                               "f0 + 'b', f1 + 's', f2 + 'i', f3 + 'L', f4 + 
'f', f5 + \"d\"");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "1b,1s,1i,1L,1.0f,1.0d";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testNumericAutocastInArithmetic() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, 
String>> input =
+                               env.fromElements(new Tuple7<>((byte) 1, (short) 
1, 1, 1L, 1.0f, 1.0d, "Hello"));
+
+               Table table =
+                               tableEnv.fromDataSet(input);
+
+               Table result = table.select("f0 + 1, f1 +" +
+                               " 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "2,2,2,2.0,2.0,2.0";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testNumericAutocastInComparison() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, 
String>> input =
+                               env.fromElements(
+                                               new Tuple7<>((byte) 1, (short) 
1, 1, 1L, 1.0f, 1.0d, "Hello"),
+                                               new Tuple7<>((byte) 2, (short) 
2, 2, 2L, 2.0f, 2.0d, "Hello"));
+
+               Table table =
+                               tableEnv.fromDataSet(input, "a,b,c,d,e,f,g");
+
+               Table result = table
+                               .filter("a > 1 && b > 1 && c > 1L && d > 1.0f 
&& e > 1.0d && f > 1");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "2,2,2,2,2.0,2.0,Hello";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testCastFromString() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSource<Tuple3<String, String, String>> input =
+                               env.fromElements(new Tuple3<>("1", "true", 
"2.0"));
+
+               Table table =
+                               tableEnv.fromDataSet(input);
+
+               Table result = table.select(
+                               "f0.cast(BYTE), f0.cast(SHORT), f0.cast(INT), 
f0.cast(LONG), f2.cast(DOUBLE), f2.cast(FLOAT), f1.cast(BOOL)");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "1,1,1,1,2.0,2.0,true\n";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testCastDateFromString() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSource<Tuple4<String, String, String, String>> input =
+                               env.fromElements(new Tuple4<>("2011-05-03", 
"15:51:36", "2011-05-03 15:51:36.000", "1446473775"));
+
+               Table table =
+                               tableEnv.fromDataSet(input);
+
+               Table result = table
+                               .select("f0.cast(DATE) AS f0, f1.cast(DATE) AS 
f1, f2.cast(DATE) AS f2, f3.cast(DATE) AS f3")
+                               .select("f0.cast(STRING), f1.cast(STRING), 
f2.cast(STRING), f3.cast(STRING)");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "2011-05-03 00:00:00.000,1970-01-01 
15:51:36.000,2011-05-03 15:51:36.000," +
+                               "1970-01-17 17:47:53.775\n";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testCastDateToStringAndLong() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSource<Tuple2<String, String>> input =
+                       env.fromElements(new Tuple2<>("2011-05-03 
15:51:36.000", "1304437896000"));
+
+               Table table =
+                       tableEnv.fromDataSet(input);
+
+               Table result = table
+                       .select("f0.cast(DATE) AS f0, f1.cast(DATE) AS f1")
+                       .select("f0.cast(STRING), f0.cast(LONG), 
f1.cast(STRING), f1.cast(LONG)");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "2011-05-03 
15:51:36.000,1304437896000,2011-05-03 15:51:36.000,1304437896000\n";
+               compareResultAsText(results, expected);
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
new file mode 100644
index 0000000..c9bba62
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import org.apache.flink.api.table.ExpressionException;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class ExpressionsITCase extends MultipleProgramsTestBase {
+
+
+       public ExpressionsITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Test
+       public void testArithmetic() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSource<Tuple2<Integer, Integer>> input =
+                               env.fromElements(new Tuple2<>(5, 10));
+
+               Table table =
+                               tableEnv.fromDataSet(input, "a, b");
+
+               Table result = table.select(
+                               "a - 5, a + 5, a / 2, a * 2, a % 2, -a");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "0,10,2,10,1,-5";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testLogic() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSource<Tuple2<Integer, Boolean>> input =
+                               env.fromElements(new Tuple2<>(5, true));
+
+               Table table =
+                               tableEnv.fromDataSet(input, "a, b");
+
+               Table result = table.select(
+                               "b && true, b && false, b || false, !b");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "true,false,true,false";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testComparisons() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSource<Tuple3<Integer, Integer, Integer>> input =
+                               env.fromElements(new Tuple3<>(5, 5, 4));
+
+               Table table =
+                               tableEnv.fromDataSet(input, "a, b, c");
+
+               Table result = table.select(
+                               "a > c, a >= b, a < c, a.isNull, a.isNotNull");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "true,true,false,false,true";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testBitwiseOperation() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSource<Tuple2<Byte, Byte>> input =
+                               env.fromElements(new Tuple2<>((byte) 3, (byte) 
5));
+
+               Table table =
+                               tableEnv.fromDataSet(input, "a, b");
+
+               Table result = table.select(
+                               "a & b, a | b, a ^ b, ~a");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "1,7,6,-4";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testBitwiseWithAutocast() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSource<Tuple2<Integer, Byte>> input =
+                               env.fromElements(new Tuple2<>(3, (byte) 5));
+
+               Table table =
+                               tableEnv.fromDataSet(input, "a, b");
+
+               Table result = table.select(
+                               "a & b, a | b, a ^ b, ~a");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "1,7,6,-4";
+               compareResultAsText(results, expected);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testBitwiseWithNonWorkingAutocast() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSource<Tuple2<Float, Byte>> input =
+                               env.fromElements(new Tuple2<>(3.0f, (byte) 5));
+
+               Table table =
+                               tableEnv.fromDataSet(input, "a, b");
+
+               Table result =
+                               table.select("a & b, a | b, a ^ b, ~a");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
new file mode 100644
index 0000000..44e0def
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class FilterITCase extends MultipleProgramsTestBase {
+
+
+       public FilterITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Test
+       public void testAllRejectingFilter() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
+
+               Table table =
+                               tableEnv.fromDataSet(input, "a, b, c");
+
+               Table result = table
+                               .filter("false");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "\n";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testAllPassingFilter() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
+
+               Table table =
+                               tableEnv.fromDataSet(input, "a, b, c");
+
+               Table result = table
+                               .filter("true");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello 
world\n" + "4,3,Hello world, " +
+                               "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
+                               "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+                               "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+                               "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+                               "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testFilterOnIntegerTupleField() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
+
+               Table table =
+                               tableEnv.fromDataSet(input, "a, b, c");
+
+               Table result = table
+                               .filter(" a % 2 = 0 ");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "2,2,Hello\n" + "4,3,Hello world, how are 
you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
+                               "Comment#2\n" + "10,4,Comment#4\n" + 
"12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+                               "Comment#10\n" + "18,6,Comment#12\n" + 
"20,6,Comment#14\n";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testNotEquals() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
+
+               Table table =
+                               tableEnv.fromDataSet(input, "a, b, c");
+
+               Table result = table
+                               .filter("!( a % 2 <> 0 ) ");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "2,2,Hello\n" + "4,3,Hello world, how are 
you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
+                               "Comment#2\n" + "10,4,Comment#4\n" + 
"12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+                               "Comment#10\n" + "18,6,Comment#12\n" + 
"20,6,Comment#14\n";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testIntegerBiggerThan128() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> input = 
env.fromElements(new Tuple3<>(300, 1L, "Hello"));
+
+               Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+               Table result = table.filter("a = 300 ");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "300,1,Hello\n";
+               compareResultAsText(results, expected);
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
new file mode 100644
index 0000000..f5c9185
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import org.apache.flink.api.table.ExpressionException;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
+
+
+       public GroupedAggregationsITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testGroupingOnNonExistentField() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
+
+               Table table =
+                               tableEnv.fromDataSet(input, "a, b, c");
+
+               Table result = table
+                               .groupBy("foo").select("a.avg");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testGroupedAggregate() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
+
+               Table table =
+                               tableEnv.fromDataSet(input, "a, b, c");
+
+               Table result = table
+                               .groupBy("b").select("b, a.sum");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + 
"5,65\n" + "6,111\n";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testGroupingKeyForwardIfNotUsed() throws Exception {
+
+               // the grouping key needs to be forwarded to the intermediate 
DataSet, even
+               // if we don't want the key in the output
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
+
+               Table table =
+                               tableEnv.fromDataSet(input, "a, b, c");
+
+               Table result = table
+                               .groupBy("b").select("a.sum");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+               List<Row> results = ds.collect();
+               String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + 
"111\n";
+               compareResultAsText(results, expected);
+       }
+
+       @Test
+       public void testGroupNoAggregation() throws Exception {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
+
+               Table table =
+                       tableEnv.fromDataSet(input, "a, b, c");
+
+               Table result = table
+                       .groupBy("b").select("a.sum as d, b").groupBy("b, 
d").select("b");
+
+               DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+
+               String expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n";
+               List<Row> results = ds.collect();
+               compareResultAsText(results, expected);
+       }
+}
+

Reply via email to