http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AggregationsITCase.scala
 
b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AggregationsITCase.scala
deleted file mode 100644
index 4a358bc..0000000
--- 
a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AggregationsITCase.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions.test
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.expressions._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class AggregationsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = ""
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
-  }
-
-  @Test
-  def testAggregationTypes: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).toExpression
-      .select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "231,1,21,21,11"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAggregationOnNonExistingField: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).toExpression
-      .select('foo.avg)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = ""
-  }
-
-  @Test
-  def testWorkingAggregationDataTypes: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(
-      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toExpression
-      .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1,1,1,1.5,1.5,2"
-  }
-
-  @Test
-  def testAggregationWithArithmetic: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toExpression
-      .select(('_1 + 2).avg + 2, '_2.count + " THE COUNT")
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "5.5,2 THE COUNT"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testNonWorkingAggregationDataTypes: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(("Hello", 1)).toExpression
-      .select('_1.sum)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = ""
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testNoNestedAggregations: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(("Hello", 1)).toExpression
-      .select('_2.sum.sum)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = ""
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AsITCase.scala
 
b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AsITCase.scala
deleted file mode 100644
index 18d7b09..0000000
--- 
a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AsITCase.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions.test
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.expressions._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) 
{
-  private var resultPath: String = null
-  private var expected: String = ""
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
-  }
-
-  @Test
-  def testAs: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello 
world, " +
-      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + 
"10,4,Comment#4\n" + "11,5," +
-      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" + "15,5," +
-      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" + "19," +
-      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithToFewFields: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithToManyFields: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithAmbiguousFields: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithNonFieldReference1: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    // as can only have field references
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a + 1, 'b, 'b)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithNonFieldReference2: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    // as can only have field references
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a as 'foo, 'b, 'b)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/CastingITCase.scala
 
b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/CastingITCase.scala
deleted file mode 100644
index 599ef6b..0000000
--- 
a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/CastingITCase.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions.test
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.expressions._
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class CastingITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = ""
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
-  }
-
-  @Test
-  def testAutoCastToString: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 
1.0d)).toExpression
-      .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d")
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1b,1s,1i,1L,1.0f,1.0d"
-  }
-
-  @Test
-  def testNumericAutoCastInArithmetic: Unit = {
-
-    // don't test everything, just some common cast directions
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 
1.0d)).toExpression
-      .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "2,2,2,2.0,2.0,2.0"
-  }
-
-  @Test
-  def testNumericAutoCastInComparison: Unit = {
-
-    // don't test everything, just some common cast directions
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(
-      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).as('a, 'b, 'c, 'd, 'e, 'f)
-      .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d  && 'f > 1)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "2,2,2,2,2.0,2.0"
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/ExpressionsITCase.scala
 
b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/ExpressionsITCase.scala
deleted file mode 100644
index 9d37f70..0000000
--- 
a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/ExpressionsITCase.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions.test
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.expressions._
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class ExpressionsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = ""
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
-  }
-
-  @Test
-  def testArithmetic: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements((5, 10)).as('a, 'b)
-      .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "0,10,2,10,1,-5"
-  }
-
-  @Test
-  def testLogic: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements((5, true)).as('a, 'b)
-      .select('b && true, 'b && false, 'b || false, !'b)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "true,false,true,false"
-  }
-
-  @Test
-  def testComparisons: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements((5, 5, 4)).as('a, 'b, 'c)
-      .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "true,true,false,false,true"
-  }
-
-  @Test
-  def testBitwiseOperations: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val ds = env.fromElements((3.toByte, 5.toByte)).as('a, 'b)
-      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,7,6,-4"
-  }
-
-  @Test
-  def testBitwiseWithAutocast: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val ds = env.fromElements((3, 5.toByte)).as('a, 'b)
-      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,7,6,-4"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testBitwiseWithNonWorkingAutocast: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val ds = env.fromElements((3.0, 5)).as('a, 'b)
-      .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,7,6,-4"
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/FilterITCase.scala
 
b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/FilterITCase.scala
deleted file mode 100644
index 2841534..0000000
--- 
a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/FilterITCase.scala
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions.test
-
-import org.apache.flink.api.expressions.tree.Literal
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.expressions._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-
-@RunWith(classOf[Parameterized])
-class FilterITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = null
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
-  }
-
-  @Test
-  def testAllRejectingFilter: Unit = {
-    /*
-     * Test all-rejecting filter.
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(false) )
-
-    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "\n"
-  }
-
-  @Test
-  def testAllPassingFilter: Unit = {
-    /*
-     * Test all-passing filter.
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(true) )
-
-    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello 
world, " +
-      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + 
"10,4,Comment#4\n" + "11,5," +
-      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" + "15,5," +
-      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" + "19," +
-      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-  }
-
-  @Test
-  def testFilterOnStringTupleField: Unit = {
-    /*
-     * Test filter on String tuple field.
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val filterDs = ds.filter( _._3.contains("world") )
-    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
-  }
-
-  @Test
-  def testFilterOnIntegerTupleField: Unit = {
-    /*
-     * Test filter on Integer tuple field.
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 === 0 )
-
-    filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke 
Skywalker\n" + "8,4," +
-      "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + 
"14,5,Comment#8\n" + "16,6," +
-      "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
-  }
-
-  // These two not yet done, but are planned
-
-  @Ignore
-  @Test
-  def testFilterBasicType: Unit = {
-    /*
-     * Test filter on basic type
-     */
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.getStringDataSet(env)
-
-    val filterDs = ds.filter( _.startsWith("H") )
-
-    filterDs.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are 
you?\n"
-  }
-
-  @Ignore
-  @Test
-  def testFilterOnCustomType: Unit = {
-    /*
-     * Test filter on custom type
-     */
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.getCustomTypeDataSet(env)
-    val filterDs = ds.filter( _.myString.contains("a") )
-    filterDs.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + 
"3,5,Luke Skywalker\n"
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/GroupedAggreagationsITCase.scala
 
b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/GroupedAggreagationsITCase.scala
deleted file mode 100644
index f2e0286..0000000
--- 
a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/GroupedAggreagationsITCase.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions.test
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.expressions._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class GroupedAggreagationsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = ""
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testGroupingOnNonExistentField: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-      .groupBy('_foo)
-      .select('a.avg)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = ""
-  }
-
-  @Test
-  def testGroupedAggregate: Unit = {
-
-    // the grouping key needs to be forwarded to the intermediate DataSet, even
-    // if we don't want the key in the output
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-      .groupBy('b)
-      .select('b, 'a.sum)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
-  }
-
-  @Test
-  def testGroupingKeyForwardIfNotUsed: Unit = {
-
-    // the grouping key needs to be forwarded to the intermediate DataSet, even
-    // if we don't want the key in the output
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-      .groupBy('b)
-      .select('a.sum)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/JoinITCase.scala
 
b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/JoinITCase.scala
deleted file mode 100644
index 91b3b19..0000000
--- 
a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/JoinITCase.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions.test
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.expressions._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class JoinITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = ""
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
-  }
-
-  @Test
-  def testJoin: Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
-    val joinDs = ds1.join(ds2).where('b === 'e).select('c, 'g)
-
-    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
-  }
-
-  @Test
-  def testJoinWithFilter: Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
-    val joinDs = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
-
-    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "Hi,Hallo\n"
-  }
-
-  @Test
-  def testJoinWithMultipleKeys: Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
-    val joinDs = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
-
-    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt 
wie gehts?\n" +
-      "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testJoinNonExistingKey: Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
-    val joinDs = ds1.join(ds2).where('foo === 'e).select('c, 'g)
-
-    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = ""
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testJoinWithNonMatchingKeyTypes: Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
-    val joinDs = ds1.join(ds2).where('a === 'g).select('c, 'g)
-
-    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = ""
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testJoinWithAmbiguousFields: Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'c)
-
-    val joinDs = ds1.join(ds2).where('a === 'd).select('c, 'g)
-
-    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = ""
-  }
-
-  @Test
-  def testJoinWithAggregation: Unit = {
-    val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
-    val joinDs = ds1.join(ds2).where('a === 'd).select('g.count)
-
-    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
-    expected = "6"
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/SelectITCase.scala
 
b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/SelectITCase.scala
deleted file mode 100644
index a799b60..0000000
--- 
a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/SelectITCase.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions.test
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.expressions._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class SelectITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = ""
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
-  }
-
-  @Test
-  def testSimpleSelectAll: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).toExpression.select('_1, 
'_2, '_3)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello 
world, " +
-      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + 
"10,4,Comment#4\n" + "11,5," +
-      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" + "15,5," +
-      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" + "19," +
-      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-  }
-
-  @Test
-  def testSimpleSelectAllWithAs: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 
'c).select('a, 'b, 'c)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello 
world, " +
-      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + 
"10,4,Comment#4\n" + "11,5," +
-      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + 
"14,5,Comment#8\n" + "15,5," +
-      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + 
"18,6,Comment#12\n" + "19," +
-      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-  }
-
-  @Test
-  def testSimpleSelectWithNaming: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).toExpression
-      .select('_1 as 'a, '_2 as 'b)
-      .select('a, 'b)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + 
"7,4\n" +
-      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" 
+ "15,5\n" +
-      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithToFewFields: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithToManyFields: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testAsWithAmbiguousFields: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
-  }
-
-
-  @Test(expected = classOf[ExpressionException])
-  def testOnlyFieldRefInAs: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b as 'c, 'd)
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "no"
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/StringExpressionsITCase.scala
 
b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/StringExpressionsITCase.scala
deleted file mode 100644
index c6c1113..0000000
--- 
a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/StringExpressionsITCase.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions.test
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.expressions._
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class StringExpressionsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
-  private var expected: String = ""
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
-
-  @After
-  def after: Unit = {
-    compareResultsByLinesInMemory(expected, resultPath)
-  }
-
-  @Test
-  def testSubstring: Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b)
-      .select('a.substring(0, 'b))
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "AA\nB"
-  }
-
-  @Test
-  def testSubstringWithMaxEnd: Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b)
-      .select('a.substring('b))
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "CD\nBCD"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testNonWorkingSubstring1: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b)
-      .select('a.substring(0, 'b))
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "AAA\nBB"
-  }
-
-  @Test(expected = classOf[ExpressionException])
-  def testNonWorkingSubstring2: Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val ds = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b)
-      .select('a.substring('b, 15))
-
-    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
-    env.execute()
-    expected = "AAA\nBB"
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 3db6531..c869a18 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -57,6 +57,11 @@ class DataStream[T](javaStream: JavaStream[T]) {
   def getJavaStream: JavaStream[T] = javaStream
 
   /**
+   * Returns the TypeInformation for the elements of this DataStream.
+   */
+  def getType(): TypeInformation[T] = javaStream.getType
+
+  /**
    * Sets the parallelism of this operation. This must be greater than 1.
    */
   def setParallelism(parallelism: Int): DataStream[T] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/pom.xml 
b/flink-staging/flink-table/pom.xml
new file mode 100644
index 0000000..dcdbe83
--- /dev/null
+++ b/flink-staging/flink-table/pom.xml
@@ -0,0 +1,246 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-staging</artifactId>
+               <version>0.9-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-table</artifactId>
+       <name>flink-table</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+
+               <dependency>
+                       <groupId>com.google.guava</groupId>
+                       <artifactId>guava</artifactId>
+                       <version>${guava.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-scala</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-scala</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-scala-examples</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.scala-lang</groupId>
+                       <artifactId>scala-reflect</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.scala-lang</groupId>
+                       <artifactId>scala-library</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.scala-lang</groupId>
+                       <artifactId>scala-compiler</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-tests</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <!-- Scala Compiler -->
+                       <plugin>
+                               <groupId>net.alchim31.maven</groupId>
+                               <artifactId>scala-maven-plugin</artifactId>
+                               <version>3.1.4</version>
+                               <executions>
+                                       <!-- Run scala compiler in the 
process-resources phase, so that dependencies on
+                                               scala classes can be resolved 
later in the (Java) compile phase -->
+                                       <execution>
+                                               <id>scala-compile-first</id>
+                                               <phase>process-resources</phase>
+                                               <goals>
+                                                       <goal>compile</goal>
+                                               </goals>
+                                       </execution>
+
+                                       <!-- Run scala compiler in the 
process-test-resources phase, so that dependencies on
+                                                scala classes can be resolved 
later in the (Java) test-compile phase -->
+                                       <execution>
+                                               <id>scala-test-compile</id>
+                                               
<phase>process-test-resources</phase>
+                                               <goals>
+                                                       <goal>testCompile</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                               <configuration>
+                                       <jvmArgs>
+                                               <jvmArg>-Xms128m</jvmArg>
+                                               <jvmArg>-Xmx512m</jvmArg>
+                                       </jvmArgs>
+                                       <compilerPlugins 
combine.children="append">
+                                               <compilerPlugin>
+                                                       
<groupId>org.scalamacros</groupId>
+                                                       
<artifactId>paradise_${scala.version}</artifactId>
+                                                       
<version>${scala.macros.version}</version>
+                                               </compilerPlugin>
+                                       </compilerPlugins>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Eclipse Integration -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-eclipse-plugin</artifactId>
+                               <version>2.8</version>
+                               <configuration>
+                                       <downloadSources>true</downloadSources>
+                                       <projectnatures>
+                                               
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+                                               
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+                                       </projectnatures>
+                                       <buildcommands>
+                                               
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+                                       </buildcommands>
+                                       <classpathContainers>
+                                               
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+                                               
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+                                       </classpathContainers>
+                                       <excludes>
+                                               
<exclude>org.scala-lang:scala-library</exclude>
+                                               
<exclude>org.scala-lang:scala-compiler</exclude>
+                                       </excludes>
+                                       <sourceIncludes>
+                                               
<sourceInclude>**/*.scala</sourceInclude>
+                                               
<sourceInclude>**/*.java</sourceInclude>
+                                       </sourceIncludes>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Adding scala source directories to build path -->
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               
<artifactId>build-helper-maven-plugin</artifactId>
+                               <version>1.7</version>
+                               <executions>
+                                       <!-- Add src/main/scala to eclipse 
build path -->
+                                       <execution>
+                                               <id>add-source</id>
+                                               <phase>generate-sources</phase>
+                                               <goals>
+                                                       <goal>add-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>src/main/scala</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                                       <!-- Add src/test/scala to eclipse 
build path -->
+                                       <execution>
+                                               <id>add-test-source</id>
+                                               
<phase>generate-test-sources</phase>
+                                               <goals>
+                                                       
<goal>add-test-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>src/test/scala</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <plugin>
+                               <groupId>org.scalastyle</groupId>
+                               <artifactId>scalastyle-maven-plugin</artifactId>
+                               <version>0.5.0</version>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>check</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                               <configuration>
+                                       <verbose>false</verbose>
+                                       <failOnViolation>true</failOnViolation>
+                                       
<includeTestSourceDirectory>true</includeTestSourceDirectory>
+                                       <failOnWarning>false</failOnWarning>
+                                       
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+                                       
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+                                       
<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+                                       
<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+                                       <outputEncoding>UTF-8</outputEncoding>
+                               </configuration>
+                       </plugin>
+
+               </plugins>
+       </build>
+
+       <profiles>
+               <profile>
+                       <id>scala-2.10</id>
+                       <activation>
+                               <property>
+                                       <!-- this is the default scala profile 
-->
+                                       <name>!scala-2.11</name>
+                               </property>
+                       </activation>
+                       <dependencies>
+                               <dependency>
+                                       <groupId>org.scalamacros</groupId>
+                                       
<artifactId>quasiquotes_${scala.binary.version}</artifactId>
+                                       
<version>${scala.macros.version}</version>
+                               </dependency>
+                       </dependencies>
+               </profile>
+       </profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java
 
b/flink-staging/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java
new file mode 100644
index 0000000..6f9f0a3
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <strong>Table API (Java)</strong><br>
+ *
+ * {@link org.apache.flink.api.java.table.TableEnvironment} can be used to 
create a
+ * {@link org.apache.flink.api.table.Table} from a {@link 
org.apache.flink.api.java.DataSet}
+ * or {@link org.apache.flink.streaming.api.datastream.DataStream}.
+ *
+ * <p>
+ * This can be used to perform SQL-like queries on data. Please have
+ * a look at {@link org.apache.flink.api.table.Table} to see which operations 
are supported and
+ * how query strings are written.
+ *
+ * <p>
+ * Example:
+ *
+ * <code>
+ * ExecutionEnvironment env = 
ExecutionEnvironment.createCollectionsEnvironment();
+ *
+ * DataSet<WC> input = env.fromElements(
+ *   new WC("Hello", 1),
+ *   new WC("Ciao", 1),
+ *   new WC("Hello", 1));
+ *
+ * Table table = TableUtil.from(input);
+ *
+ * Table filtered = table
+ *     .groupBy("word")
+ *     .select("word.count as count, word")
+ *     .filter("count = 2");
+ *
+ * DataSet<WC> result = TableUtil.toSet(filtered, WC.class);
+ *
+ * result.print();
+ * env.execute();
+ * </code>
+ *
+ * <p>
+ * As seen above, a {@link org.apache.flink.api.table.Table} can be converted 
back to the
+ * underlying API representation using {@link 
org.apache.flink.api.java.table.TableEnvironment.toSet}
+ * or {@link org.apache.flink.api.java.table.TableEnvironment.toStream}.
+ */
+package org.apache.flink.api.java.table;

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
 
b/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
new file mode 100644
index 0000000..d7fbc8e
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <strong>Table API</strong><br>
+ *
+ * This package contains the generic part of the Table API. It can be used 
with Flink Streaming
+ * and Flink Batch. From Scala as well as from Java.
+ *
+ * When using the Table API, as user creates a 
[[org.apache.flink.api.table.Table]] from
+ * a DataSet or DataStream. On this relational operations can be performed. A 
table can also
+ * be converted back to a DataSet or DataStream.
+ *
+ * Packages [[org.apache.flink.api.scala.table]] and 
[[org.apache.flink.api.java.table]] contain
+ * the language specific part of the API. Refer to these packages for 
documentation on how
+ * the Table API can be used in Java and Scala.
+ */
+package org.apache.flink.api.table;

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
 
b/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
new file mode 100644
index 0000000..1c1fdca
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.java;
+
+
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+
+/**
+ * Very simple example that shows how the Java Table API can be used.
+ */
+public class JavaTableExample {
+
+       public static class WC {
+               public String word;
+               public int count;
+
+               // Public constructor to make it a Flink POJO
+               public WC() {
+
+               }
+
+               public WC(String word, int count) {
+                       this.word = word;
+                       this.count = count;
+               }
+
+               @Override
+               public String toString() {
+                       return "WC " + word + " " + count;
+               }
+       }
+       public static void main(String[] args) throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.createCollectionsEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<WC> input = env.fromElements(
+                               new WC("Hello", 1),
+                               new WC("Ciao", 1),
+                               new WC("Hello", 1));
+
+               Table table = tableEnv.toTable(input);
+
+               Table filtered = table
+                               .groupBy("word")
+                               .select("word.count as count, word")
+                               .filter("count = 2");
+
+               DataSet<WC> result = tableEnv.toSet(filtered, WC.class);
+
+               result.print();
+               env.execute();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
new file mode 100644
index 0000000..17e4823
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table
+
+import java.lang.reflect.Modifier
+
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.aggregation.AggregationFunction
+import org.apache.flink.api.java.operators.JoinOperator.EquiJoin
+import org.apache.flink.api.java.operators.Keys.ExpressionKeys
+import org.apache.flink.api.java.operators.{GroupReduceOperator, Keys, 
MapOperator, UnsortedGrouping}
+import org.apache.flink.api.java.{DataSet => JavaDataSet}
+import org.apache.flink.api.table.analysis.ExtractEquiJoinFields
+import org.apache.flink.api.table.operations._
+import org.apache.flink.api.table.runtime.{ExpressionAggregateFunction, 
ExpressionFilterFunction, ExpressionJoinFunction, ExpressionSelectFunction}
+import org.apache.flink.api.table.tree._
+import org.apache.flink.api.table.typeinfo.{RenameOperator, 
RenamingProxyTypeInfo, RowTypeInfo}
+import org.apache.flink.api.table.{ExpressionException, Row, Table}
+
+/**
+ * [[TableTranslator]] for creating [[Table]]s from Java 
[[org.apache.flink.api.java.DataSet]]s and
+ * translating them back to Java [[org.apache.flink.api.java.DataSet]]s.
+ */
+class JavaBatchTranslator extends TableTranslator {
+
+  type Representation[A] = JavaDataSet[A]
+
+  override def createTable[A](
+      repr: Representation[A],
+      inputType: CompositeType[A],
+      expressions: Array[Expression],
+      resultFields: Seq[(String, TypeInformation[_])]): Table[this.type] = {
+
+    val rowDataSet = createSelect(expressions, repr, inputType)
+
+    Table(Root(rowDataSet, resultFields), this)
+  }
+
+  override def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): 
JavaDataSet[A] = {
+
+    if (tpe.getTypeClass == classOf[Row]) {
+      // shortcut for DataSet[Row]
+      return translateInternal(op).asInstanceOf[JavaDataSet[A]]
+    }
+
+    val clazz = tpe.getTypeClass
+    if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
+      throw new ExpressionException("Cannot create DataSet of type " +
+        clazz.getName + ". Only top-level classes or static member classes are 
supported.")
+    }
+
+
+    if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) {
+      throw new ExpressionException(
+        "A Table can only be converted to composite types, type is: " +
+          implicitly[TypeInformation[A]] +
+          ". Composite types would be tuples, case classes and POJOs.")
+    }
+
+    val resultSet = translateInternal(op)
+
+    val resultType = resultSet.getType.asInstanceOf[RowTypeInfo]
+
+    val outputType = 
implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]]
+
+    val resultNames = resultType.getFieldNames
+    val outputNames = outputType.getFieldNames.toSeq
+
+    if (resultNames.toSet != outputNames.toSet) {
+      throw new ExpressionException(s"Expression result type $resultType does 
not have the same" +
+        s"fields as output type $outputType")
+    }
+
+    for (f <- outputNames) {
+      val in = resultType.getTypeAt(resultType.getFieldIndex(f))
+      val out = outputType.getTypeAt(outputType.getFieldIndex(f))
+      if (!in.equals(out)) {
+        throw new ExpressionException(s"Types for field $f differ on input 
$resultType and " +
+          s"output $outputType.")
+      }
+    }
+
+    val outputFields = outputNames map {
+      f => ResolvedFieldReference(f, resultType.getTypeAt(f))
+    }
+
+    val function = new ExpressionSelectFunction(
+      resultSet.getType.asInstanceOf[RowTypeInfo],
+      outputType,
+      outputFields)
+
+    val opName = s"select(${outputFields.mkString(",")})"
+    val operator = new MapOperator(resultSet, outputType, function, opName)
+
+    operator
+  }
+
+  private def translateInternal(op: Operation): JavaDataSet[Row] = {
+    op match {
+      case Root(dataSet: JavaDataSet[Row], resultFields) =>
+        dataSet
+
+      case Root(_, _) =>
+        throw new ExpressionException("Invalid Root for JavaBatchTranslator: " 
+ op)
+
+      case GroupBy(_, fields) =>
+        throw new ExpressionException("Dangling GroupBy operation. Did you 
forget a " +
+          "SELECT statement?")
+
+      case As(input, newNames) =>
+        val translatedInput = translateInternal(input)
+        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
+        val proxyType = new RenamingProxyTypeInfo[Row](inType, 
newNames.toArray)
+        new RenameOperator(translatedInput, proxyType)
+
+      case sel@Select(Filter(Join(leftInput, rightInput), predicate), 
selection) =>
+
+        val expandedInput = ExpandAggregations(sel)
+
+        if (expandedInput.eq(sel)) {
+          val translatedLeftInput = translateInternal(leftInput)
+          val translatedRightInput = translateInternal(rightInput)
+          val leftInType = 
translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
+          val rightInType = 
translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
+
+          createJoin(
+            predicate,
+            selection,
+            translatedLeftInput,
+            translatedRightInput,
+            leftInType,
+            rightInType,
+            JoinHint.OPTIMIZER_CHOOSES)
+        } else {
+          translateInternal(expandedInput)
+        }
+
+      case Filter(Join(leftInput, rightInput), predicate) =>
+        val translatedLeftInput = translateInternal(leftInput)
+        val translatedRightInput = translateInternal(rightInput)
+        val leftInType = 
translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
+        val rightInType = 
translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
+
+        createJoin(
+          predicate,
+          leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) 
++
+            rightInput.outputFields.map( f => ResolvedFieldReference(f._1, 
f._2)),
+          translatedLeftInput,
+          translatedRightInput,
+          leftInType,
+          rightInType,
+          JoinHint.OPTIMIZER_CHOOSES)
+
+      case Join(leftInput, rightInput) =>
+        throw new ExpressionException("Join without filter condition 
encountered. " +
+          "Did you forget to add .where(...) ?")
+
+      case sel@Select(input, selection) =>
+
+        val expandedInput = ExpandAggregations(sel)
+
+        if (expandedInput.eq(sel)) {
+          // no expansions took place
+          val translatedInput = translateInternal(input)
+          val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
+          val inputFields = inType.getFieldNames
+          createSelect(
+            selection,
+            translatedInput,
+            inType)
+        } else {
+          translateInternal(expandedInput)
+        }
+
+      case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) =>
+        val translatedInput = translateInternal(input)
+        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
+
+        val keyIndices = groupExpressions map {
+          case fe: ResolvedFieldReference => inType.getFieldIndex(fe.name)
+          case e => throw new ExpressionException(s"Expression $e is not a 
valid key expression.")
+        }
+
+        val keys = new Keys.ExpressionKeys(keyIndices.toArray, inType, false)
+
+        val grouping = new UnsortedGrouping(translatedInput, keys)
+
+        val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map {
+          case (fieldName, fun) =>
+            fun.getFactory.createAggregationFunction[Any](
+              
inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass)
+        }
+
+        val aggIndices = aggregations map {
+          case (fieldName, _) =>
+            inType.getFieldIndex(fieldName)
+        }
+
+        val result = new GroupReduceOperator(
+          grouping,
+          inType,
+          new ExpressionAggregateFunction(aggIndices, aggFunctions),
+          "Expression Aggregation: " + agg)
+
+        result
+
+      case agg@Aggregate(input, aggregations) =>
+        val translatedInput = translateInternal(input)
+        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
+
+        val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map {
+          case (fieldName, fun) =>
+            fun.getFactory.createAggregationFunction[Any](
+              
inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass)
+        }
+
+        val aggIndices = aggregations map {
+          case (fieldName, _) =>
+            inType.getFieldIndex(fieldName)
+        }
+
+        val result = new GroupReduceOperator(
+          translatedInput,
+          inType,
+          new ExpressionAggregateFunction(aggIndices, aggFunctions),
+          "Expression Aggregation: " + agg)
+
+        result
+
+
+      case Filter(input, predicate) =>
+        val translatedInput = translateInternal(input)
+        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
+        val filter = new ExpressionFilterFunction[Row](predicate, inType)
+        translatedInput.filter(filter)
+    }
+  }
+
+  private def createSelect[I](
+      fields: Seq[Expression],
+      input: JavaDataSet[I],
+      inputType: CompositeType[I]): JavaDataSet[Row] = {
+
+    fields foreach {
+      f =>
+        if (f.exists(_.isInstanceOf[Aggregation])) {
+          throw new ExpressionException("Found aggregate in " + 
fields.mkString(", ") + ".")
+        }
+
+    }
+
+    val resultType = new RowTypeInfo(fields)
+
+    val function = new ExpressionSelectFunction(inputType, resultType, fields)
+
+    val opName = s"select(${fields.mkString(",")})"
+    val operator = new MapOperator(input, resultType, function, opName)
+
+    operator
+  }
+
+  private def createJoin[L, R](
+      predicate: Expression,
+      fields: Seq[Expression],
+      leftInput: JavaDataSet[L],
+      rightInput: JavaDataSet[R],
+      leftType: CompositeType[L],
+      rightType: CompositeType[R],
+      joinHint: JoinHint): JavaDataSet[Row] = {
+
+    val resultType = new RowTypeInfo(fields)
+
+    val (reducedPredicate, leftFields, rightFields) =
+      ExtractEquiJoinFields(leftType, rightType, predicate)
+
+    if (leftFields.isEmpty || rightFields.isEmpty) {
+      throw new ExpressionException("Could not derive equi-join predicates " +
+        "for predicate " + predicate + ".")
+    }
+
+    val leftKey = new ExpressionKeys[L](leftFields, leftType)
+    val rightKey = new ExpressionKeys[R](rightFields, rightType)
+
+    val joiner = new ExpressionJoinFunction[L, R, Row](
+      reducedPredicate,
+      leftType,
+      rightType,
+      resultType,
+      fields)
+
+    new EquiJoin[L, R, Row](
+      leftInput,
+      rightInput,
+      leftKey,
+      rightKey,
+      joiner,
+      resultType,
+      joinHint,
+      predicate.toString)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala
new file mode 100644
index 0000000..4fd79e4
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table
+
+import java.lang.reflect.Modifier
+
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.operations._
+import org.apache.flink.api.table.runtime.{ExpressionFilterFunction, 
ExpressionSelectFunction}
+import org.apache.flink.api.table.tree._
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+import org.apache.flink.api.table.{ExpressionException, Row, Table}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.invokable.operator.MapInvokable
+
+/**
+ * [[TableTranslator]] for creating [[Table]]s from Java [[DataStream]]s and
+ * translating them back to Java [[DataStream]]s.
+ *
+ * This is very limited right now. Only select and filter are implemented. 
Also, the expression
+ * operations must be extended to allow windowing operations.
+ */
+
+class JavaStreamingTranslator extends TableTranslator {
+
+  type Representation[A] = DataStream[A]
+
+  override def createTable[A](
+      repr: Representation[A],
+      inputType: CompositeType[A],
+      expressions: Array[Expression],
+      resultFields: Seq[(String, TypeInformation[_])]): Table[this.type] = {
+
+    val rowDataStream = createSelect(expressions, repr, inputType)
+
+    new Table(Root(rowDataStream, resultFields), this)
+  }
+
+  override def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): 
DataStream[A] = {
+
+    if (tpe.getTypeClass == classOf[Row]) {
+      // shortcut for DataSet[Row]
+      return translateInternal(op).asInstanceOf[DataStream[A]]
+    }
+
+    val clazz = tpe.getTypeClass
+    if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
+      throw new ExpressionException("Cannot create DataStream of type " +
+        clazz.getName + ". Only top-level classes or static member classes are 
supported.")
+    }
+
+    if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) {
+      throw new ExpressionException(
+        "A Table can only be converted to composite types, type is: " +
+          implicitly[TypeInformation[A]] +
+          ". Composite types would be tuples, case classes and POJOs.")
+
+    }
+
+    val resultSet = translateInternal(op)
+
+    val resultType = resultSet.getType.asInstanceOf[RowTypeInfo]
+
+    val outputType = 
implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]]
+
+    val resultNames = resultType.getFieldNames
+    val outputNames = outputType.getFieldNames.toSeq
+
+    if (resultNames.toSet != outputNames.toSet) {
+      throw new ExpressionException(s"Expression result type $resultType does 
not have the same" +
+        s"fields as output type $outputType")
+    }
+
+    for (f <- outputNames) {
+      val in = resultType.getTypeAt(resultType.getFieldIndex(f))
+      val out = outputType.getTypeAt(outputType.getFieldIndex(f))
+      if (!in.equals(out)) {
+        throw new ExpressionException(s"Types for field $f differ on input 
$resultType and " +
+          s"output $outputType.")
+      }
+    }
+
+    val outputFields = outputNames map {
+      f => ResolvedFieldReference(f, resultType.getTypeAt(f))
+    }
+
+    val function = new ExpressionSelectFunction(
+      resultSet.getType.asInstanceOf[RowTypeInfo],
+      outputType,
+      outputFields)
+
+    val opName = s"select(${outputFields.mkString(",")})"
+
+    resultSet.transform(opName, outputType, new MapInvokable[Row, A](function))
+  }
+
+  private def translateInternal(op: Operation): DataStream[Row] = {
+    op match {
+      case Root(dataSet: DataStream[Row], resultFields) =>
+        dataSet
+
+      case Root(_, _) =>
+        throw new ExpressionException("Invalid Root for 
JavaStreamingTranslator: " + op)
+
+      case GroupBy(_, fields) =>
+        throw new ExpressionException("Dangling GroupBy operation. Did you 
forget a " +
+          "SELECT statement?")
+
+      case As(input, newNames) =>
+        throw new ExpressionException("As operation for Streams not yet 
implemented.")
+
+      case sel@Select(Filter(Join(leftInput, rightInput), predicate), 
selection) =>
+
+        val expandedInput = ExpandAggregations(sel)
+
+        if (expandedInput.eq(sel)) {
+          val translatedLeftInput = translateInternal(leftInput)
+          val translatedRightInput = translateInternal(rightInput)
+          val leftInType = 
translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
+          val rightInType = 
translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
+
+          createJoin(
+            predicate,
+            selection,
+            translatedLeftInput,
+            translatedRightInput,
+            leftInType,
+            rightInType,
+            JoinHint.OPTIMIZER_CHOOSES)
+        } else {
+          translateInternal(expandedInput)
+        }
+
+      case Filter(Join(leftInput, rightInput), predicate) =>
+        val translatedLeftInput = translateInternal(leftInput)
+        val translatedRightInput = translateInternal(rightInput)
+        val leftInType = 
translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
+        val rightInType = 
translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
+
+        createJoin(
+          predicate,
+          leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) 
++
+            rightInput.outputFields.map( f => ResolvedFieldReference(f._1, 
f._2)),
+          translatedLeftInput,
+          translatedRightInput,
+          leftInType,
+          rightInType,
+          JoinHint.OPTIMIZER_CHOOSES)
+
+      case Join(leftInput, rightInput) =>
+        throw new ExpressionException("Join without filter condition 
encountered. " +
+          "Did you forget to add .where(...) ?")
+
+      case sel@Select(input, selection) =>
+
+        val expandedInput = ExpandAggregations(sel)
+
+        if (expandedInput.eq(sel)) {
+          // no expansions took place
+          val translatedInput = translateInternal(input)
+          val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
+          val inputFields = inType.getFieldNames
+          createSelect(
+            selection,
+            translatedInput,
+            inType)
+        } else {
+          translateInternal(expandedInput)
+        }
+
+      case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) =>
+        throw new ExpressionException("Aggregate operation for Streams not yet 
implemented.")
+
+      case agg@Aggregate(input, aggregations) =>
+        throw new ExpressionException("Aggregate operation for Streams not yet 
implemented.")
+
+      case Filter(input, predicate) =>
+        val translatedInput = translateInternal(input)
+        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
+        val filter = new ExpressionFilterFunction[Row](predicate, inType)
+        translatedInput.filter(filter)
+    }
+  }
+
+  private def createSelect[I](
+      fields: Seq[Expression],
+      input: DataStream[I],
+      inputType: CompositeType[I]): DataStream[Row] = {
+
+    fields foreach {
+      f =>
+        if (f.exists(_.isInstanceOf[Aggregation])) {
+          throw new ExpressionException("Found aggregate in " + 
fields.mkString(", ") + ".")
+        }
+
+    }
+
+    val resultType = new RowTypeInfo(fields)
+
+    val function = new ExpressionSelectFunction(inputType, resultType, fields)
+
+    val opName = s"select(${fields.mkString(",")})"
+
+    input.transform(opName, resultType, new MapInvokable[I, Row](function))
+  }
+
+  private def createJoin[L, R](
+      predicate: Expression,
+      fields: Seq[Expression],
+      leftInput: DataStream[L],
+      rightInput: DataStream[R],
+      leftType: CompositeType[L],
+      rightType: CompositeType[R],
+      joinHint: JoinHint): DataStream[Row] = {
+
+    throw new ExpressionException("Join operation for Streams not yet 
implemented.")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
new file mode 100644
index 0000000..030fa12
--- /dev/null
+++ 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.table
+
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.Table
+import org.apache.flink.streaming.api.datastream.DataStream
+
+/**
+ * Environment for working with the Table API.
+ *
+ * This can be used to convert [[DataSet]] or [[DataStream]] to a [[Table]] 
and back again. You
+ * can also use the provided methods to create a [[Table]] directly from a 
data source.
+ */
+class TableEnvironment {
+
+  /**
+   * Transforms the given DataSet to a [[org.apache.flink.api.table.Table]].
+   * The fields of the DataSet type are renamed to the given set of fields:
+   *
+   * Example:
+   *
+   * {{{
+   *   tableEnv.toTable(set, "a, b")
+   * }}}
+   *
+   * This will transform the set containing elements of two fields to a table 
where the fields
+   * are named a and b.
+   */
+  def toTable[T](set: DataSet[T], fields: String): Table[JavaBatchTranslator] 
= {
+    new JavaBatchTranslator().createTable(set, 
fields).asInstanceOf[Table[JavaBatchTranslator]]
+  }
+
+  /**
+   * Transforms the given DataSet to a [[org.apache.flink.api.table.Table]].
+   * The fields of the DataSet type are used to name the
+   * [[org.apache.flink.api.table.Table]] fields.
+   */
+  def toTable[T](set: DataSet[T]): Table[JavaBatchTranslator] = {
+    new 
JavaBatchTranslator().createTable(set).asInstanceOf[Table[JavaBatchTranslator]]
+  }
+
+  /**
+   * Transforms the given DataStream to a [[org.apache.flink.api.table.Table]].
+   * The fields of the DataSet type are renamed to the given set of fields:
+   *
+   * Example:
+   *
+   * {{{
+   *   tableEnv.toTable(set, "a, b")
+   * }}}
+   *
+   * This will transform the set containing elements of two fields to a table 
where the fields
+   * are named a and b.
+   */
+  def toTable[T](set: DataStream[T], fields: String): 
Table[JavaStreamingTranslator] = {
+    new JavaStreamingTranslator().createTable(set, fields)
+      .asInstanceOf[Table[JavaStreamingTranslator]]
+  }
+
+  /**
+   * Transforms the given DataSet to a [[org.apache.flink.api.table.Table]].
+   * The fields of the DataSet type are used to name the
+   * [[org.apache.flink.api.table.Table]] fields.
+   */
+  def toTable[T](set: DataStream[T]): Table[JavaStreamingTranslator] = {
+    new 
JavaStreamingTranslator().createTable(set).asInstanceOf[Table[JavaStreamingTranslator]]
+  }
+
+  /**
+   * Converts the given [[org.apache.flink.api.table.Table]] to
+   * a DataSet. The given type must have exactly the same fields as the
+   * [[org.apache.flink.api.table.Table]]. That is, the names of the
+   * fields and the types must match.
+   */
+  @SuppressWarnings(Array("unchecked"))
+  def toSet[T](
+      table: Table[JavaBatchTranslator],
+      clazz: Class[T]): DataSet[T] = {
+    table.as(TypeExtractor.createTypeInfo(clazz)).asInstanceOf[DataSet[T]]
+  }
+
+  /**
+   * Converts the given [[org.apache.flink.api.table.Table]] to
+   * a DataStream. The given type must have exactly the same fields as the
+   * [[org.apache.flink.api.table.Table]]. That is, the names of the
+   * fields and the types must match.
+   */
+  @SuppressWarnings(Array("unchecked"))
+  def toStream[T](
+      table: Table[JavaStreamingTranslator],
+    clazz: Class[T]): DataStream[T] = {
+    table.as(TypeExtractor.createTypeInfo(clazz)).asInstanceOf[DataStream[T]]
+  }
+}
+

Reply via email to