http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/RegressionData.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/RegressionData.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/RegressionData.scala
new file mode 100644
index 0000000..062f510
--- /dev/null
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/RegressionData.scala
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.regression
+
+import org.apache.flink.ml.common.LabeledVector
+import org.apache.flink.ml.math.DenseVector
+
+object RegressionData {
+
+  val expectedWeights = Array[Double](3.0094)
+  val expectedWeight0: Double = 9.8158
+  val expectedSquaredResidualSum: Double = 49.7596/2
+
+  val data: Seq[LabeledVector] = Seq(
+    LabeledVector(10.7949, DenseVector(0.2714)),
+    LabeledVector(10.6426, DenseVector(0.1008)),
+    LabeledVector(10.5603, DenseVector(0.5078)),
+    LabeledVector(12.8707, DenseVector(0.5856)),
+    LabeledVector(10.7026, DenseVector(0.7629)),
+    LabeledVector(9.8571, DenseVector(0.0830)),
+    LabeledVector(10.5001, DenseVector(0.6616)),
+    LabeledVector(11.2063, DenseVector(0.5170)),
+    LabeledVector(9.1892, DenseVector(0.1710)),
+    LabeledVector(12.2408, DenseVector(0.9386)),
+    LabeledVector(11.0307, DenseVector(0.5905)),
+    LabeledVector(10.1369, DenseVector(0.4406)),
+    LabeledVector(10.7609, DenseVector(0.9419)),
+    LabeledVector(12.5328, DenseVector(0.6559)),
+    LabeledVector(13.3560, DenseVector(0.4519)),
+    LabeledVector(14.7424, DenseVector(0.8397)),
+    LabeledVector(11.1057, DenseVector(0.5326)),
+    LabeledVector(11.6157, DenseVector(0.5539)),
+    LabeledVector(11.5744, DenseVector(0.6801)),
+    LabeledVector(11.1775, DenseVector(0.3672)),
+    LabeledVector(9.7991, DenseVector(0.2393)),
+    LabeledVector(9.8173, DenseVector(0.5789)),
+    LabeledVector(12.5642, DenseVector(0.8669)),
+    LabeledVector(9.9952, DenseVector(0.4068)),
+    LabeledVector(8.4354, DenseVector(0.1126)),
+    LabeledVector(13.7058, DenseVector(0.4438)),
+    LabeledVector(10.6672, DenseVector(0.3002)),
+    LabeledVector(11.6080, DenseVector(0.4014)),
+    LabeledVector(13.6926, DenseVector(0.8334)),
+    LabeledVector(9.5261, DenseVector(0.4036)),
+    LabeledVector(11.5837, DenseVector(0.3902)),
+    LabeledVector(11.5831, DenseVector(0.3604)),
+    LabeledVector(10.5038, DenseVector(0.1403)),
+    LabeledVector(10.9382, DenseVector(0.2601)),
+    LabeledVector(9.7325, DenseVector(0.0868)),
+    LabeledVector(12.0113, DenseVector(0.4294)),
+    LabeledVector(9.9219, DenseVector(0.2573)),
+    LabeledVector(10.0963, DenseVector(0.2976)),
+    LabeledVector(11.9999, DenseVector(0.4249)),
+    LabeledVector(12.0442, DenseVector(0.1192))
+  )
+
+  val expectedNoInterceptWeights = Array[Double](5.0)
+  val expectedNoInterceptWeight0: Double = 0.0
+
+  val noInterceptData: Seq[LabeledVector] = Seq(
+    LabeledVector(217.228709, DenseVector(43.4457419)),
+    LabeledVector(450.037048, DenseVector(90.0074095)),
+    LabeledVector( 67.553478, DenseVector(13.5106955)),
+    LabeledVector( 26.976958, DenseVector( 5.3953916)),
+    LabeledVector(403.808709, DenseVector(80.7617418)),
+    LabeledVector(203.932158, DenseVector(40.7864316)),
+    LabeledVector(146.974958, DenseVector(29.3949916)),
+    LabeledVector( 46.869291, DenseVector( 9.3738582)),
+    LabeledVector(450.780834, DenseVector(90.1561667)),
+    LabeledVector(386.535619, DenseVector(77.3071239)),
+    LabeledVector(202.644342, DenseVector(40.5288684)),
+    LabeledVector(227.586507, DenseVector(45.5173013)),
+    LabeledVector(408.801080, DenseVector(81.7602161)),
+    LabeledVector(146.118550, DenseVector(29.2237100)),
+    LabeledVector(156.475382, DenseVector(31.2950763)),
+    LabeledVector(291.822515, DenseVector(58.3645030)),
+    LabeledVector( 61.506887, DenseVector(12.3013775)),
+    LabeledVector(363.949913, DenseVector(72.7899827)),
+    LabeledVector(398.050744, DenseVector(79.6101487)),
+    LabeledVector(246.053111, DenseVector(49.2106221)),
+    LabeledVector(225.494661, DenseVector(45.0989323)),
+    LabeledVector(265.986844, DenseVector(53.1973689)),
+    LabeledVector(110.459912, DenseVector(22.0919823)),
+    LabeledVector(122.716974, DenseVector(24.5433947)),
+    LabeledVector(128.014314, DenseVector(25.6028628)),
+    LabeledVector(252.538913, DenseVector(50.5077825)),
+    LabeledVector(393.632082, DenseVector(78.7264163)),
+    LabeledVector( 77.698941, DenseVector(15.5397881)),
+    LabeledVector(206.187568, DenseVector(41.2375135)),
+    LabeledVector(244.073426, DenseVector(48.8146851)),
+    LabeledVector(364.946890, DenseVector(72.9893780)),
+    LabeledVector(  4.627494, DenseVector( 0.9254987)),
+    LabeledVector(485.359565, DenseVector(97.0719130)),
+    LabeledVector(347.359190, DenseVector(69.4718380)),
+    LabeledVector(419.663211, DenseVector(83.9326422)),
+    LabeledVector(488.518318, DenseVector(97.7036635)),
+    LabeledVector( 28.082962, DenseVector( 5.6165925)),
+    LabeledVector(211.002441, DenseVector(42.2004881)),
+    LabeledVector(250.624124, DenseVector(50.1248248)),
+    LabeledVector(489.776669, DenseVector(97.9553337))
+  )
+
+
+  val expectedPolynomialWeights = Seq(0.2375, -0.3493, -0.1674)
+  val expectedPolynomialWeight0 = 0.0233
+  val expectedPolynomialSquaredResidualSum = 1.5389e+03/2
+
+  val polynomialData: Seq[LabeledVector] = Seq(
+    LabeledVector(2.1415, DenseVector(3.6663)),
+    LabeledVector(10.9835, DenseVector(4.0761)),
+    LabeledVector(7.2507, DenseVector(0.5714)),
+    LabeledVector(11.9274, DenseVector(4.1102)),
+    LabeledVector(-4.2798, DenseVector(2.8456)),
+    LabeledVector(7.1929, DenseVector(0.4389)),
+    LabeledVector(4.5097, DenseVector(1.2532)),
+    LabeledVector(-3.6059, DenseVector(2.4610)),
+    LabeledVector(18.1132, DenseVector(4.3088)),
+    LabeledVector(19.2674, DenseVector(4.3420)),
+    LabeledVector(7.0664, DenseVector(0.7093)),
+    LabeledVector(20.1836, DenseVector(4.3677)),
+    LabeledVector(18.0609, DenseVector(4.3073)),
+    LabeledVector(-2.2090, DenseVector(2.1842)),
+    LabeledVector(1.1306, DenseVector(3.6013)),
+    LabeledVector(7.1903, DenseVector(0.6385)),
+    LabeledVector(-0.2668, DenseVector(1.8979)),
+    LabeledVector(12.2281, DenseVector(4.1208)),
+    LabeledVector(0.6086, DenseVector(3.5649)),
+    LabeledVector(18.4202, DenseVector(4.3177)),
+    LabeledVector(-4.1284, DenseVector(2.9508)),
+    LabeledVector(6.1964, DenseVector(0.1607)),
+    LabeledVector(4.9638, DenseVector(3.8211)),
+    LabeledVector(14.6677, DenseVector(4.2030)),
+    LabeledVector(-3.8132, DenseVector(3.0543)),
+    LabeledVector(-1.2891, DenseVector(3.4098)),
+    LabeledVector(-1.9390, DenseVector(3.3441)),
+    LabeledVector(0.7293, DenseVector(1.7650)),
+    LabeledVector(-4.1310, DenseVector(2.9497)),
+    LabeledVector(6.9131, DenseVector(0.7703)),
+    LabeledVector(-3.2060, DenseVector(3.1772)),
+    LabeledVector(6.0899, DenseVector(0.1432)),
+    LabeledVector(4.5567, DenseVector(1.2462)),
+    LabeledVector(6.4562, DenseVector(0.2078)),
+    LabeledVector(7.1903, DenseVector(0.4371)),
+    LabeledVector(2.8017, DenseVector(3.7056)),
+    LabeledVector(-3.4873, DenseVector(3.1267)),
+    LabeledVector(3.2918, DenseVector(1.4269)),
+    LabeledVector(17.0085, DenseVector(4.2760)),
+    LabeledVector(6.1622, DenseVector(0.1550)),
+    LabeledVector(-0.8192, DenseVector(1.9743)),
+    LabeledVector(1.0957, DenseVector(1.7170)),
+    LabeledVector(-0.9065, DenseVector(3.4448)),
+    LabeledVector(0.7986, DenseVector(3.5784)),
+    LabeledVector(6.6861, DenseVector(0.8409)),
+    LabeledVector(-2.3274, DenseVector(2.2039)),
+    LabeledVector(-1.0359, DenseVector(2.0051)),
+    LabeledVector(-4.2092, DenseVector(2.9084)),
+    LabeledVector(-3.1140, DenseVector(3.1921)),
+    LabeledVector(-1.4323, DenseVector(3.3961))
+  )
+
+  val expectedRegWeights = Array[Double](0.0, 0.0, 0.0, 0.18, 0.2, 0.24)
+  val expectedRegWeight0 = 0.74
+
+  // Example values from scikit-learn L1 test: http://git.io/vf4V2
+  val regularizationData: Seq[LabeledVector] = Seq(
+    LabeledVector(1.0, DenseVector(1.0,0.9 ,0.8 ,0.0 ,0.0 ,0.0)),
+    LabeledVector(1.0, DenseVector(1.0,0.84,0.98,0.0 ,0.0 ,0.0)),
+    LabeledVector(1.0, DenseVector(1.0,0.96,0.88,0.0 ,0.0 ,0.0)),
+    LabeledVector(1.0, DenseVector(1.0,0.91,0.99,0.0 ,0.0 ,0.0)),
+    LabeledVector(2.0, DenseVector(0.0,0.0 ,0.0 ,0.89,0.91,1.0)),
+    LabeledVector(2.0, DenseVector(0.0,0.0 ,0.0 ,0.79,0.84,1.0)),
+    LabeledVector(2.0, DenseVector(0.0,0.0 ,0.0 ,0.91,0.95,1.0)),
+    LabeledVector(2.0, DenseVector(0.0,0.0 ,0.0 ,0.93,1.0 ,1.0))
+  )
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml 
b/flink-libraries/flink-table/pom.xml
new file mode 100644
index 0000000..19951cb
--- /dev/null
+++ b/flink-libraries/flink-table/pom.xml
@@ -0,0 +1,258 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-libraries</artifactId>
+               <version>1.0-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-table</artifactId>
+       <name>flink-table</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+
+               <dependency>
+                       <groupId>com.google.guava</groupId>
+                       <artifactId>guava</artifactId>
+                       <version>${guava.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-scala</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-scala</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-examples-batch</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.scala-lang</groupId>
+                       <artifactId>scala-reflect</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.scala-lang</groupId>
+                       <artifactId>scala-library</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.scala-lang</groupId>
+                       <artifactId>scala-compiler</artifactId>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.codehaus.janino</groupId>
+                       <artifactId>janino</artifactId>
+                       <version>2.7.5</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-tests</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.fasterxml.jackson.core</groupId>
+                       <artifactId>jackson-databind</artifactId>
+                       <version>${jackson.version}</version>
+               </dependency>
+
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <!-- Scala Compiler -->
+                       <plugin>
+                               <groupId>net.alchim31.maven</groupId>
+                               <artifactId>scala-maven-plugin</artifactId>
+                               <version>3.1.4</version>
+                               <executions>
+                                       <!-- Run scala compiler in the 
process-resources phase, so that dependencies on
+                                               scala classes can be resolved 
later in the (Java) compile phase -->
+                                       <execution>
+                                               <id>scala-compile-first</id>
+                                               <phase>process-resources</phase>
+                                               <goals>
+                                                       <goal>compile</goal>
+                                               </goals>
+                                       </execution>
+
+                                       <!-- Run scala compiler in the 
process-test-resources phase, so that dependencies on
+                                                scala classes can be resolved 
later in the (Java) test-compile phase -->
+                                       <execution>
+                                               <id>scala-test-compile</id>
+                                               
<phase>process-test-resources</phase>
+                                               <goals>
+                                                       <goal>testCompile</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                               <configuration>
+                                       <jvmArgs>
+                                               <jvmArg>-Xms128m</jvmArg>
+                                               <jvmArg>-Xmx512m</jvmArg>
+                                       </jvmArgs>
+                                       <compilerPlugins 
combine.children="append">
+                                               <compilerPlugin>
+                                                       
<groupId>org.scalamacros</groupId>
+                                                       
<artifactId>paradise_${scala.version}</artifactId>
+                                                       
<version>${scala.macros.version}</version>
+                                               </compilerPlugin>
+                                       </compilerPlugins>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Eclipse Integration -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-eclipse-plugin</artifactId>
+                               <version>2.8</version>
+                               <configuration>
+                                       <downloadSources>true</downloadSources>
+                                       <projectnatures>
+                                               
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+                                               
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+                                       </projectnatures>
+                                       <buildcommands>
+                                               
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+                                       </buildcommands>
+                                       <classpathContainers>
+                                               
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+                                               
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+                                       </classpathContainers>
+                                       <excludes>
+                                               
<exclude>org.scala-lang:scala-library</exclude>
+                                               
<exclude>org.scala-lang:scala-compiler</exclude>
+                                       </excludes>
+                                       <sourceIncludes>
+                                               
<sourceInclude>**/*.scala</sourceInclude>
+                                               
<sourceInclude>**/*.java</sourceInclude>
+                                       </sourceIncludes>
+                               </configuration>
+                       </plugin>
+
+                       <!-- Adding scala source directories to build path -->
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               
<artifactId>build-helper-maven-plugin</artifactId>
+                               <version>1.7</version>
+                               <executions>
+                                       <!-- Add src/main/scala to eclipse 
build path -->
+                                       <execution>
+                                               <id>add-source</id>
+                                               <phase>generate-sources</phase>
+                                               <goals>
+                                                       <goal>add-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>src/main/scala</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                                       <!-- Add src/test/scala to eclipse 
build path -->
+                                       <execution>
+                                               <id>add-test-source</id>
+                                               
<phase>generate-test-sources</phase>
+                                               <goals>
+                                                       
<goal>add-test-source</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sources>
+                                                               
<source>src/test/scala</source>
+                                                       </sources>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
+                       <plugin>
+                               <groupId>org.scalastyle</groupId>
+                               <artifactId>scalastyle-maven-plugin</artifactId>
+                               <version>0.5.0</version>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>check</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                               <configuration>
+                                       <verbose>false</verbose>
+                                       <failOnViolation>true</failOnViolation>
+                                       
<includeTestSourceDirectory>true</includeTestSourceDirectory>
+                                       <failOnWarning>false</failOnWarning>
+                                       
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+                                       
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+                                       
<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+                                       
<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+                                       <outputEncoding>UTF-8</outputEncoding>
+                               </configuration>
+                       </plugin>
+
+               </plugins>
+       </build>
+
+       <profiles>
+               <profile>
+                       <id>scala-2.10</id>
+                       <activation>
+                               <property>
+                                       <!-- this is the default scala profile 
-->
+                                       <name>!scala-2.11</name>
+                               </property>
+                       </activation>
+                       <dependencies>
+                               <dependency>
+                                       <groupId>org.scalamacros</groupId>
+                                       
<artifactId>quasiquotes_${scala.binary.version}</artifactId>
+                                       
<version>${scala.macros.version}</version>
+                               </dependency>
+                       </dependencies>
+               </profile>
+       </profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java
new file mode 100644
index 0000000..97113bb
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <strong>Table API (Java)</strong><br>
+ *
+ * {@link org.apache.flink.api.java.table.TableEnvironment} can be used to 
create a
+ * {@link org.apache.flink.api.table.Table} from a {@link 
org.apache.flink.api.java.DataSet}
+ * or {@link org.apache.flink.streaming.api.datastream.DataStream}.
+ *
+ * <p>
+ * This can be used to perform SQL-like queries on data. Please have
+ * a look at {@link org.apache.flink.api.table.Table} to see which operations 
are supported and
+ * how query strings are written.
+ *
+ * <p>
+ * Example:
+ *
+ * <pre>{@code
+ * ExecutionEnvironment env = 
ExecutionEnvironment.createCollectionsEnvironment();
+ *
+ * DataSet<WC> input = env.fromElements(
+ *   new WC("Hello", 1),
+ *   new WC("Ciao", 1),
+ *   new WC("Hello", 1));
+ *
+ * Table table = TableUtil.from(input);
+ *
+ * Table filtered = table
+ *     .groupBy("word")
+ *     .select("word.count as count, word")
+ *     .filter("count = 2");
+ *
+ * DataSet<WC> result = TableUtil.toSet(filtered, WC.class);
+ *
+ * result.print();
+ * env.execute();
+ * }</pre>
+ *
+ * <p>
+ * As seen above, a {@link org.apache.flink.api.table.Table} can be converted 
back to the
+ * underlying API representation using {@link 
org.apache.flink.api.java.table.TableEnvironment#toDataSet(org.apache.flink.api.table.Table,
 java.lang.Class)}
+ * or {@link 
org.apache.flink.api.java.table.TableEnvironment#toDataStream(org.apache.flink.api.table.Table,
 java.lang.Class)}}.
+ */
+package org.apache.flink.api.java.table;

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
new file mode 100644
index 0000000..d7fbc8e
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <strong>Table API</strong><br>
+ *
+ * This package contains the generic part of the Table API. It can be used 
with Flink Streaming
+ * and Flink Batch. From Scala as well as from Java.
+ *
+ * When using the Table API, as user creates a 
[[org.apache.flink.api.table.Table]] from
+ * a DataSet or DataStream. On this relational operations can be performed. A 
table can also
+ * be converted back to a DataSet or DataStream.
+ *
+ * Packages [[org.apache.flink.api.scala.table]] and 
[[org.apache.flink.api.java.table]] contain
+ * the language specific part of the API. Refer to these packages for 
documentation on how
+ * the Table API can be used in Java and Scala.
+ */
+package org.apache.flink.api.table;

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
new file mode 100644
index 0000000..c043508
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.java;
+
+
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+
+/**
+ * Very simple example that shows how the Java Table API can be used.
+ */
+public class JavaTableExample {
+
+       public static class WC {
+               public String word;
+               public int count;
+
+               // Public constructor to make it a Flink POJO
+               public WC() {
+
+               }
+
+               public WC(String word, int count) {
+                       this.word = word;
+                       this.count = count;
+               }
+
+               @Override
+               public String toString() {
+                       return "WC " + word + " " + count;
+               }
+       }
+
+       public static void main(String[] args) throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.createCollectionsEnvironment();
+               TableEnvironment tableEnv = new TableEnvironment();
+
+               DataSet<WC> input = env.fromElements(
+                               new WC("Hello", 1),
+                               new WC("Ciao", 1),
+                               new WC("Hello", 1));
+
+               Table table = tableEnv.fromDataSet(input);
+
+               Table filtered = table
+                               .groupBy("word")
+                               .select("word.count as count, word")
+                               .filter("count = 2");
+
+               DataSet<WC> result = tableEnv.toDataSet(filtered, WC.class);
+
+               result.print();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
new file mode 100644
index 0000000..9dc9297
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table
+
+import java.lang.reflect.Modifier
+
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.aggregation.AggregationFunction
+import org.apache.flink.api.java.operators.JoinOperator.EquiJoin
+import org.apache.flink.api.java.operators.Keys.ExpressionKeys
+import org.apache.flink.api.java.operators.{GroupReduceOperator, Keys, 
MapOperator, UnsortedGrouping}
+import org.apache.flink.api.java.{DataSet => JavaDataSet}
+import org.apache.flink.api.table.expressions.analysis.ExtractEquiJoinFields
+import org.apache.flink.api.table.plan._
+import org.apache.flink.api.table.runtime._
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeinfo.{RenameOperator, 
RenamingProxyTypeInfo, RowTypeInfo}
+import org.apache.flink.api.table.{ExpressionException, Row, Table}
+
+/**
+ * [[PlanTranslator]] for creating [[Table]]s from Java 
[[org.apache.flink.api.java.DataSet]]s and
+ * translating them back to Java [[org.apache.flink.api.java.DataSet]]s.
+ */
+class JavaBatchTranslator extends PlanTranslator {
+
+  type Representation[A] = JavaDataSet[A]
+
+  override def createTable[A](
+      repr: Representation[A],
+      inputType: CompositeType[A],
+      expressions: Array[Expression],
+      resultFields: Seq[(String, TypeInformation[_])]): Table = {
+
+    val rowDataSet = createSelect(expressions, repr, inputType)
+
+    Table(Root(rowDataSet, resultFields))
+  }
+
+  override def translate[A](op: PlanNode)(implicit tpe: TypeInformation[A]): 
JavaDataSet[A] = {
+
+    if (tpe.getTypeClass == classOf[Row]) {
+      // shortcut for DataSet[Row]
+      return translateInternal(op).asInstanceOf[JavaDataSet[A]]
+    }
+
+    val clazz = tpe.getTypeClass
+    if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
+      throw new ExpressionException("Cannot create DataSet of type " +
+        clazz.getName + ". Only top-level classes or static member classes are 
supported.")
+    }
+
+
+    if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) {
+      throw new ExpressionException(
+        "A Table can only be converted to composite types, type is: " +
+          implicitly[TypeInformation[A]] +
+          ". Composite types would be tuples, case classes and POJOs.")
+    }
+
+    val resultSet = translateInternal(op)
+
+    val resultType = resultSet.getType.asInstanceOf[RowTypeInfo]
+
+    val outputType = 
implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]]
+
+    val resultNames = resultType.getFieldNames
+    val outputNames = outputType.getFieldNames.toSeq
+
+    if (resultNames.toSet != outputNames.toSet) {
+      throw new ExpressionException(s"Expression result type $resultType does 
not have the same " +
+        s"fields as output type $outputType")
+    }
+
+    for (f <- outputNames) {
+      val in = resultType.getTypeAt(resultType.getFieldIndex(f))
+      val out = outputType.getTypeAt(outputType.getFieldIndex(f))
+      if (!in.equals(out)) {
+        throw new ExpressionException(s"Types for field $f differ on input 
$resultType and " +
+          s"output $outputType.")
+      }
+    }
+
+    val outputFields = outputNames map {
+      f => ResolvedFieldReference(f, resultType.getTypeAt(f))
+    }
+
+    val function = new ExpressionSelectFunction(
+      resultSet.getType.asInstanceOf[RowTypeInfo],
+      outputType,
+      outputFields)
+
+    val opName = s"select(${outputFields.mkString(",")})"
+    val operator = new MapOperator(resultSet, outputType, function, opName)
+
+    operator
+  }
+
+  private def translateInternal(op: PlanNode): JavaDataSet[Row] = {
+    op match {
+      case Root(dataSet: JavaDataSet[Row], resultFields) =>
+        dataSet
+
+      case Root(_, _) =>
+        throw new ExpressionException("Invalid Root for JavaBatchTranslator: " 
+ op + ". " +
+          "Did you try converting a Table based on a DataSet to a DataStream 
or vice-versa?")
+
+      case GroupBy(_, fields) =>
+        throw new ExpressionException("Dangling GroupBy operation. Did you 
forget a " +
+          "SELECT statement?")
+
+      case As(input, newNames) =>
+        val translatedInput = translateInternal(input)
+        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
+        val proxyType = new RenamingProxyTypeInfo[Row](inType, 
newNames.toArray)
+        new RenameOperator(translatedInput, proxyType)
+
+      case sel@Select(Filter(Join(leftInput, rightInput), predicate), 
selection) =>
+
+        val expandedInput = ExpandAggregations(sel)
+
+        if (expandedInput.eq(sel)) {
+          val translatedLeftInput = translateInternal(leftInput)
+          val translatedRightInput = translateInternal(rightInput)
+          val leftInType = 
translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
+          val rightInType = 
translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
+
+          createJoin(
+            predicate,
+            selection,
+            translatedLeftInput,
+            translatedRightInput,
+            leftInType,
+            rightInType,
+            JoinHint.OPTIMIZER_CHOOSES)
+        } else {
+          translateInternal(expandedInput)
+        }
+
+      case Filter(Join(leftInput, rightInput), predicate) =>
+        val translatedLeftInput = translateInternal(leftInput)
+        val translatedRightInput = translateInternal(rightInput)
+        val leftInType = 
translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
+        val rightInType = 
translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
+
+        createJoin(
+          predicate,
+          leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) 
++
+            rightInput.outputFields.map( f => ResolvedFieldReference(f._1, 
f._2)),
+          translatedLeftInput,
+          translatedRightInput,
+          leftInType,
+          rightInType,
+          JoinHint.OPTIMIZER_CHOOSES)
+
+      case Join(leftInput, rightInput) =>
+        throw new ExpressionException("Join without filter condition 
encountered. " +
+          "Did you forget to add .where(...) ?")
+
+      case sel@Select(input, selection) =>
+
+        val expandedInput = ExpandAggregations(sel)
+
+        if (expandedInput.eq(sel)) {
+          val translatedInput = input match {
+            case GroupBy(groupByInput, groupExpressions) =>
+              val translatedGroupByInput = translateInternal(groupByInput)
+              val inType = 
translatedGroupByInput.getType.asInstanceOf[CompositeType[Row]]
+
+              val keyIndices = groupExpressions map {
+                case fe: ResolvedFieldReference => 
inType.getFieldIndex(fe.name)
+                case e =>
+                  throw new ExpressionException(s"Expression $e is not a valid 
key expression.")
+              }
+
+              val keys = new Keys.ExpressionKeys(keyIndices.toArray, inType, 
false)
+              val grouping = new UnsortedGrouping(translatedGroupByInput, keys)
+
+              new GroupReduceOperator(
+                grouping,
+                inType,
+                new NoExpressionAggregateFunction(),
+                "Nop Expression Aggregation")
+
+            case _ => translateInternal(input)
+          }
+
+          val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
+          val inputFields = inType.getFieldNames
+          createSelect(
+            selection,
+            translatedInput,
+            inType)
+        } else {
+          translateInternal(expandedInput)
+        }
+
+      case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) =>
+        val translatedInput = translateInternal(input)
+        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
+
+        val keyIndices = groupExpressions map {
+          case fe: ResolvedFieldReference => inType.getFieldIndex(fe.name)
+          case e => throw new ExpressionException(s"Expression $e is not a 
valid key expression.")
+        }
+
+        val keys = new Keys.ExpressionKeys(keyIndices.toArray, inType, false)
+
+        val grouping = new UnsortedGrouping(translatedInput, keys)
+
+        val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map {
+          case (fieldName, fun) =>
+            fun.getFactory.createAggregationFunction[Any](
+              
inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass)
+        }
+
+        val aggIndices = aggregations map {
+          case (fieldName, _) =>
+            inType.getFieldIndex(fieldName)
+        }
+
+        val result = new GroupReduceOperator(
+          grouping,
+          inType,
+          new ExpressionAggregateFunction(aggIndices, aggFunctions),
+          "Expression Aggregation: " + agg)
+
+        result
+
+      case agg@Aggregate(input, aggregations) =>
+        val translatedInput = translateInternal(input)
+        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
+
+        val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map {
+          case (fieldName, fun) =>
+            fun.getFactory.createAggregationFunction[Any](
+              
inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass)
+        }
+
+        val aggIndices = aggregations map {
+          case (fieldName, _) =>
+            inType.getFieldIndex(fieldName)
+        }
+
+        val result = new GroupReduceOperator(
+          translatedInput,
+          inType,
+          new ExpressionAggregateFunction(aggIndices, aggFunctions),
+          "Expression Aggregation: " + agg)
+
+        result
+
+
+      case Filter(input, predicate) =>
+        val translatedInput = translateInternal(input)
+        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
+        val filter = new ExpressionFilterFunction[Row](predicate, inType)
+        translatedInput.filter(filter).name(predicate.toString)
+
+      case uni@UnionAll(left, right) =>
+        val translatedLeft = translateInternal(left)
+        val translatedRight = translateInternal(right)
+        translatedLeft.union(translatedRight).name("Union: " + uni)
+    }
+  }
+
+  private def createSelect[I](
+      fields: Seq[Expression],
+      input: JavaDataSet[I],
+      inputType: CompositeType[I]): JavaDataSet[Row] = {
+
+    fields foreach {
+      f =>
+        if (f.exists(_.isInstanceOf[Aggregation])) {
+          throw new ExpressionException("Found aggregate in " + 
fields.mkString(", ") + ".")
+        }
+
+    }
+
+    val resultType = new RowTypeInfo(fields)
+
+    val function = new ExpressionSelectFunction(inputType, resultType, fields)
+
+    val opName = s"select(${fields.mkString(",")})"
+    val operator = new MapOperator(input, resultType, function, opName)
+
+    operator
+  }
+
+  private def createJoin[L, R](
+      predicate: Expression,
+      fields: Seq[Expression],
+      leftInput: JavaDataSet[L],
+      rightInput: JavaDataSet[R],
+      leftType: CompositeType[L],
+      rightType: CompositeType[R],
+      joinHint: JoinHint): JavaDataSet[Row] = {
+
+    val resultType = new RowTypeInfo(fields)
+
+    val (reducedPredicate, leftFields, rightFields) =
+      ExtractEquiJoinFields(leftType, rightType, predicate)
+
+    if (leftFields.isEmpty || rightFields.isEmpty) {
+      throw new ExpressionException("Could not derive equi-join predicates " +
+        "for predicate " + predicate + ".")
+    }
+
+    val leftKey = new ExpressionKeys[L](leftFields, leftType)
+    val rightKey = new ExpressionKeys[R](rightFields, rightType)
+
+    val joiner = new ExpressionJoinFunction[L, R, Row](
+      reducedPredicate,
+      leftType,
+      rightType,
+      resultType,
+      fields)
+
+    new EquiJoin[L, R, Row](
+      leftInput,
+      rightInput,
+      leftKey,
+      rightKey,
+      joiner,
+      resultType,
+      joinHint,
+      predicate.toString)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala
new file mode 100644
index 0000000..a37c892
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table
+
+import java.lang.reflect.Modifier
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.plan._
+import org.apache.flink.api.table.runtime.{ExpressionFilterFunction, 
ExpressionSelectFunction}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+import org.apache.flink.api.table.{ExpressionException, Row, Table}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.operators.StreamMap
+
+/**
+ * [[PlanTranslator]] for creating [[Table]]s from Java [[DataStream]]s and
+ * translating them back to Java [[DataStream]]s.
+ *
+ * This is very limited right now. Only select and filter are implemented. 
Also, the expression
+ * operations must be extended to allow windowing operations.
+ */
+
+class JavaStreamingTranslator extends PlanTranslator {
+
+  type Representation[A] = DataStream[A]
+
+  override def createTable[A](
+      repr: Representation[A],
+      inputType: CompositeType[A],
+      expressions: Array[Expression],
+      resultFields: Seq[(String, TypeInformation[_])]): Table = {
+
+    val rowDataStream = createSelect(expressions, repr, inputType)
+
+    new Table(Root(rowDataStream, resultFields))
+  }
+
+  override def translate[A](op: PlanNode)(implicit tpe: TypeInformation[A]): 
DataStream[A] = {
+
+    if (tpe.getTypeClass == classOf[Row]) {
+      // shortcut for DataSet[Row]
+      return translateInternal(op).asInstanceOf[DataStream[A]]
+    }
+
+    val clazz = tpe.getTypeClass
+    if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
+      throw new ExpressionException("Cannot create DataStream of type " +
+        clazz.getName + ". Only top-level classes or static member classes are 
supported.")
+    }
+
+    if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) {
+      throw new ExpressionException(
+        "A Table can only be converted to composite types, type is: " +
+          implicitly[TypeInformation[A]] +
+          ". Composite types would be tuples, case classes and POJOs.")
+
+    }
+
+    val resultSet = translateInternal(op)
+
+    val resultType = resultSet.getType.asInstanceOf[RowTypeInfo]
+
+    val outputType = 
implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]]
+
+    val resultNames = resultType.getFieldNames
+    val outputNames = outputType.getFieldNames.toSeq
+
+    if (resultNames.toSet != outputNames.toSet) {
+      throw new ExpressionException(s"Expression result type $resultType does 
not have the same" +
+        s"fields as output type $outputType")
+    }
+
+    for (f <- outputNames) {
+      val in = resultType.getTypeAt(resultType.getFieldIndex(f))
+      val out = outputType.getTypeAt(outputType.getFieldIndex(f))
+      if (!in.equals(out)) {
+        throw new ExpressionException(s"Types for field $f differ on input 
$resultType and " +
+          s"output $outputType.")
+      }
+    }
+
+    val outputFields = outputNames map {
+      f => ResolvedFieldReference(f, resultType.getTypeAt(f))
+    }
+
+    val function = new ExpressionSelectFunction(
+      resultSet.getType.asInstanceOf[RowTypeInfo],
+      outputType,
+      outputFields)
+
+    val opName = s"select(${outputFields.mkString(",")})"
+
+    resultSet.transform(opName, outputType, new StreamMap[Row, A](function))
+  }
+
+  private def translateInternal(op: PlanNode): DataStream[Row] = {
+    op match {
+      case Root(dataSet: DataStream[Row], resultFields) =>
+        dataSet
+
+      case Root(_, _) =>
+        throw new ExpressionException("Invalid Root for 
JavaStreamingTranslator: " + op + ". " +
+          "Did you try converting a Table based on a DataSet to a DataStream 
or vice-versa?")
+
+      case GroupBy(_, fields) =>
+        throw new ExpressionException("Dangling GroupBy operation. Did you 
forget a " +
+          "SELECT statement?")
+
+      case As(input, newNames) =>
+        throw new ExpressionException("As operation for Streams not yet 
implemented.")
+
+      case sel@Select(Filter(Join(leftInput, rightInput), predicate), 
selection) =>
+
+        val expandedInput = ExpandAggregations(sel)
+
+        if (expandedInput.eq(sel)) {
+          val translatedLeftInput = translateInternal(leftInput)
+          val translatedRightInput = translateInternal(rightInput)
+          val leftInType = 
translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
+          val rightInType = 
translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
+
+          createJoin(
+            predicate,
+            selection,
+            translatedLeftInput,
+            translatedRightInput,
+            leftInType,
+            rightInType,
+            JoinHint.OPTIMIZER_CHOOSES)
+        } else {
+          translateInternal(expandedInput)
+        }
+
+      case Filter(Join(leftInput, rightInput), predicate) =>
+        val translatedLeftInput = translateInternal(leftInput)
+        val translatedRightInput = translateInternal(rightInput)
+        val leftInType = 
translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
+        val rightInType = 
translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
+
+        createJoin(
+          predicate,
+          leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) 
++
+            rightInput.outputFields.map( f => ResolvedFieldReference(f._1, 
f._2)),
+          translatedLeftInput,
+          translatedRightInput,
+          leftInType,
+          rightInType,
+          JoinHint.OPTIMIZER_CHOOSES)
+
+      case Join(leftInput, rightInput) =>
+        throw new ExpressionException("Join without filter condition 
encountered. " +
+          "Did you forget to add .where(...) ?")
+
+      case sel@Select(input, selection) =>
+
+        val expandedInput = ExpandAggregations(sel)
+
+        if (expandedInput.eq(sel)) {
+          // no expansions took place
+          val translatedInput = translateInternal(input)
+          val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
+          val inputFields = inType.getFieldNames
+          createSelect(
+            selection,
+            translatedInput,
+            inType)
+        } else {
+          translateInternal(expandedInput)
+        }
+
+      case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) =>
+        throw new ExpressionException("Aggregate operation for Streams not yet 
implemented.")
+
+      case agg@Aggregate(input, aggregations) =>
+        throw new ExpressionException("Aggregate operation for Streams not yet 
implemented.")
+
+      case Filter(input, predicate) =>
+        val translatedInput = translateInternal(input)
+        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
+        val filter = new ExpressionFilterFunction[Row](predicate, inType)
+        translatedInput.filter(filter)
+
+      case UnionAll(left, right) =>
+        val translatedLeft = translateInternal(left)
+        val translatedRight = translateInternal(right)
+        translatedLeft.union(translatedRight)
+    }
+  }
+
+  private def createSelect[I](
+      fields: Seq[Expression],
+      input: DataStream[I],
+      inputType: CompositeType[I]): DataStream[Row] = {
+
+    fields foreach {
+      f =>
+        if (f.exists(_.isInstanceOf[Aggregation])) {
+          throw new ExpressionException("Found aggregate in " + 
fields.mkString(", ") + ".")
+        }
+
+    }
+
+    val resultType = new RowTypeInfo(fields)
+
+    val function = new ExpressionSelectFunction(inputType, resultType, fields)
+
+    val opName = s"select(${fields.mkString(",")})"
+
+    input.transform(opName, resultType, new StreamMap[I, Row](function))
+  }
+
+  private def createJoin[L, R](
+      predicate: Expression,
+      fields: Seq[Expression],
+      leftInput: DataStream[L],
+      rightInput: DataStream[R],
+      leftType: CompositeType[L],
+      rightType: CompositeType[R],
+      joinHint: JoinHint): DataStream[Row] = {
+
+    throw new ExpressionException("Join operation for Streams not yet 
implemented.")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
new file mode 100644
index 0000000..5614031
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.Table
+import org.apache.flink.streaming.api.datastream.DataStream
+
+/**
+ * Environment for working with the Table API.
+ *
+ * This can be used to convert [[DataSet]] or [[DataStream]] to a [[Table]] 
and back again. You
+ * can also use the provided methods to create a [[Table]] directly from a 
data source.
+ */
+class TableEnvironment {
+
+  /**
+   * Transforms the given DataSet to a [[org.apache.flink.api.table.Table]].
+   * The fields of the DataSet type are renamed to the given set of fields:
+   *
+   * Example:
+   *
+   * {{{
+   *   tableEnv.fromDataSet(set, "a, b")
+   * }}}
+   *
+   * This will transform the set containing elements of two fields to a table 
where the fields
+   * are named a and b.
+   */
+  def fromDataSet[T](set: DataSet[T], fields: String): Table = {
+    new JavaBatchTranslator().createTable(set, fields)
+  }
+
+  /**
+   * Transforms the given DataSet to a [[org.apache.flink.api.table.Table]].
+   * The fields of the DataSet type are used to name the
+   * [[org.apache.flink.api.table.Table]] fields.
+   */
+  def fromDataSet[T](set: DataSet[T]): Table = {
+    new JavaBatchTranslator().createTable(set)
+  }
+
+  /**
+   * Transforms the given DataStream to a [[org.apache.flink.api.table.Table]].
+   * The fields of the DataStream type are renamed to the given set of fields:
+   *
+   * Example:
+   *
+   * {{{
+   *   tableEnv.fromDataStream(set, "a, b")
+   * }}}
+   *
+   * This will transform the set containing elements of two fields to a table 
where the fields
+   * are named a and b.
+   */
+  def fromDataStream[T](set: DataStream[T], fields: String): Table = {
+    new JavaStreamingTranslator().createTable(set, fields)
+  }
+
+  /**
+   * Transforms the given DataStream to a [[org.apache.flink.api.table.Table]].
+   * The fields of the DataStream type are used to name the
+   * [[org.apache.flink.api.table.Table]] fields.
+   */
+  def fromDataStream[T](set: DataStream[T]): Table = {
+    new JavaStreamingTranslator().createTable(set)
+  }
+
+  /**
+   * Converts the given [[org.apache.flink.api.table.Table]] to
+   * a DataSet. The given type must have exactly the same fields as the
+   * [[org.apache.flink.api.table.Table]]. That is, the names of the
+   * fields and the types must match.
+   */
+  @SuppressWarnings(Array("unchecked"))
+  def toDataSet[T](table: Table, clazz: Class[T]): DataSet[T] = {
+    new JavaBatchTranslator().translate[T](table.operation)(
+      TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]])
+  }
+
+  /**
+   * Converts the given [[org.apache.flink.api.table.Table]] to
+   * a DataStream. The given type must have exactly the same fields as the
+   * [[org.apache.flink.api.table.Table]]. That is, the names of the
+   * fields and the types must match.
+   */
+  @SuppressWarnings(Array("unchecked"))
+  def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
+    new JavaStreamingTranslator().translate[T](table.operation)(
+      TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]])
+
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
new file mode 100644
index 0000000..2508a3d
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.table
+
+import org.apache.flink.api.table._
+import org.apache.flink.api.table.expressions.{UnresolvedFieldReference, 
Expression}
+import org.apache.flink.api.common.typeutils.CompositeType
+
+import org.apache.flink.api.scala._
+
+/**
+ * Methods for converting a [[DataSet]] to a [[Table]]. A [[DataSet]] is
+ * wrapped in this by the implicit conversions in 
[[org.apache.flink.api.scala.table]].
+ */
+class DataSetConversions[T](set: DataSet[T], inputType: CompositeType[T]) {
+
+  /**
+   * Converts the [[DataSet]] to a [[Table]]. The field names can be specified 
like this:
+   *
+   * {{{
+   *   val in: DataSet[(String, Int)] = ...
+   *   val table = in.as('a, 'b)
+   * }}}
+   *
+   * This results in a [[Table]] that has field `a` of type `String` and field 
`b`
+   * of type `Int`.
+   */
+  def as(fields: Expression*): Table = {
+     new ScalaBatchTranslator().createTable(set, fields.toArray)
+  }
+
+  /**
+   * Converts the [[DataSet]] to a [[Table]]. The field names will be taken 
from the field names
+   * of the input type.
+   *
+   * Example:
+   *
+   * {{{
+   *   val in: DataSet[(String, Int)] = ...
+   *   val table = in.toTable
+   * }}}
+   *
+   * Here, the result is a [[Table]] that has field `_1` of type `String` and 
field `_2`
+   * of type `Int`.
+   */
+  def toTable: Table = {
+    val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference)
+    as(resultFields: _*)
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
new file mode 100644
index 0000000..47bd100
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table
+
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table._
+import org.apache.flink.api.table.expressions.{Expression, 
UnresolvedFieldReference}
+import org.apache.flink.streaming.api.scala.DataStream
+
+class DataStreamConversions[T](stream: DataStream[T], inputType: 
CompositeType[T]) {
+
+  /**
+   * Converts the [[DataStream]] to a [[Table]]. The field names can be 
specified like this:
+   *
+   * {{{
+   *   val in: DataSet[(String, Int)] = ...
+   *   val table = in.as('a, 'b)
+   * }}}
+   *
+   * This results in a [[Table]] that has field `a` of type `String` and field 
`b`
+   * of type `Int`.
+   */
+
+  def as(fields: Expression*): Table = {
+     new ScalaStreamingTranslator().createTable(
+       stream,
+       fields.toArray,
+       checkDeterministicFields = true)
+  }
+
+  /**
+   * Converts the [[DataStream]] to a [[Table]]. The field names will be taken 
from the field
+   * names of the input type.
+   *
+   * Example:
+   *
+   * {{{
+   *   val in: DataSet[(String, Int)] = ...
+   *   val table = in.toTable
+   * }}}
+   *
+   * This results in a [[Table]] that has field `_1` of type `String` and 
field `_2`
+   * of type `Int`.
+   */
+
+  def toTable: Table = {
+    val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference)
+    as(resultFields: _*)
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
new file mode 100644
index 0000000..cdcf53e
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table
+
+
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.table.JavaBatchTranslator
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.scala.wrap
+import org.apache.flink.api.table.plan._
+import org.apache.flink.api.table.Table
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.DataSet
+
+import scala.reflect.ClassTag
+
+
+/**
+ * [[PlanTranslator]] for creating [[Table]]s from Scala [[DataSet]]s and
+ * translating them back to Scala [[DataSet]]s.
+ */
+class ScalaBatchTranslator extends PlanTranslator {
+
+  private val javaTranslator = new JavaBatchTranslator
+
+  type Representation[A] = DataSet[A]
+
+  def createTable[A](
+      repr: DataSet[A],
+      fields: Array[Expression]): Table = {
+
+    val result = javaTranslator.createTable(repr.javaSet, fields)
+
+    new Table(result.operation)
+  }
+
+  override def translate[O](op: PlanNode)(implicit tpe: TypeInformation[O]): 
DataSet[O] = {
+    // fake it till you make it ...
+    
wrap(javaTranslator.translate(op))(ClassTag.AnyRef.asInstanceOf[ClassTag[O]])
+  }
+
+  override def createTable[A](
+      repr: Representation[A],
+      inputType: CompositeType[A],
+      expressions: Array[Expression],
+      resultFields: Seq[(String, TypeInformation[_])]): Table = {
+
+    val result = javaTranslator.createTable(repr.javaSet, inputType, 
expressions, resultFields)
+
+    Table(result.operation)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
new file mode 100644
index 0000000..88f1b83
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.table.JavaStreamingTranslator
+import org.apache.flink.api.table.Table
+import org.apache.flink.api.table.plan._
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.streaming.api.scala.{DataStream, javaToScalaStream}
+
+/**
+ * [[PlanTranslator]] for creating [[Table]]s from Scala [[DataStream]]s and
+ * translating them back to Scala [[DataStream]]s.
+ *
+ * This is very limited right now. Only select and filter are implemented. 
Also, the expression
+ * operations must be extended to allow windowing operations.
+ */
+class ScalaStreamingTranslator extends PlanTranslator {
+
+  private val javaTranslator = new JavaStreamingTranslator
+
+  override type Representation[A] = DataStream[A]
+
+  override def translate[O](op: PlanNode)(implicit tpe: TypeInformation[O]): 
DataStream[O] = {
+    // fake it till you make it ...
+    javaToScalaStream(javaTranslator.translate(op))
+  }
+
+  override def createTable[A](
+      repr: Representation[A],
+      inputType: CompositeType[A],
+      expressions: Array[Expression],
+      resultFields: Seq[(String, TypeInformation[_])]): Table = {
+
+    val result =
+      javaTranslator.createTable(repr.getJavaStream, inputType, expressions, 
resultFields)
+
+    new Table(result.operation)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
new file mode 100644
index 0000000..4f2172e
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.api.table._
+
+import org.apache.flink.streaming.api.scala.DataStream
+
+/**
+ * Methods for converting a [[Table]] to a [[DataSet]] or [[DataStream]]. A 
[[Table]] is
+ * wrapped in this by the implicit conversions in 
[[org.apache.flink.api.scala.table]].
+ */
+class TableConversions(table: Table) {
+
+  /**
+   * Converts the [[Table]] to a [[DataSet]].
+   */
+  def toDataSet[T: TypeInformation]: DataSet[T] = {
+     new ScalaBatchTranslator().translate[T](table.operation)
+  }
+
+  /**
+   * Converts the [[Table]] to a [[DataStream]].
+   */
+  def toDataStream[T: TypeInformation]: DataStream[T] = {
+    new ScalaStreamingTranslator().translate[T](table.operation)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
new file mode 100644
index 0000000..0be6be2
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.table
+
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+import scala.language.implicitConversions
+
+/**
+ * These are all the operations that can be used to construct an 
[[Expression]] AST for expression
+ * operations.
+ *
+ * These operations must be kept in sync with the parser in
+ * [[org.apache.flink.api.table.parser.ExpressionParser]].
+ */
+trait ImplicitExpressionOperations {
+  def expr: Expression
+
+  def && (other: Expression) = And(expr, other)
+  def || (other: Expression) = Or(expr, other)
+
+  def > (other: Expression) = GreaterThan(expr, other)
+  def >= (other: Expression) = GreaterThanOrEqual(expr, other)
+  def < (other: Expression) = LessThan(expr, other)
+  def <= (other: Expression) = LessThanOrEqual(expr, other)
+
+  def === (other: Expression) = EqualTo(expr, other)
+  def !== (other: Expression) = NotEqualTo(expr, other)
+
+  def unary_! = Not(expr)
+  def unary_- = UnaryMinus(expr)
+
+  def isNull = IsNull(expr)
+  def isNotNull = IsNotNull(expr)
+
+  def + (other: Expression) = Plus(expr, other)
+  def - (other: Expression) = Minus(expr, other)
+  def / (other: Expression) = Div(expr, other)
+  def * (other: Expression) = Mul(expr, other)
+  def % (other: Expression) = Mod(expr, other)
+
+  def & (other: Expression) = BitwiseAnd(expr, other)
+  def | (other: Expression) = BitwiseOr(expr, other)
+  def ^ (other: Expression) = BitwiseXor(expr, other)
+  def unary_~ = BitwiseNot(expr)
+
+  def abs = Abs(expr)
+
+  def sum = Sum(expr)
+  def min = Min(expr)
+  def max = Max(expr)
+  def count = Count(expr)
+  def avg = Avg(expr)
+
+  def substring(beginIndex: Expression, endIndex: Expression = 
Literal(Int.MaxValue)) = {
+    Substring(expr, beginIndex, endIndex)
+  }
+
+  def cast(toType: TypeInformation[_]) = Cast(expr, toType)
+
+  def as(name: Symbol) = Naming(expr, name.name)
+}
+
+/**
+ * Implicit conversions from Scala Literals to Expression [[Literal]] and from 
[[Expression]]
+ * to [[ImplicitExpressionOperations]].
+ */
+trait ImplicitExpressionConversions {
+  implicit class WithOperations(e: Expression) extends 
ImplicitExpressionOperations {
+    def expr = e
+  }
+
+  implicit class SymbolExpression(s: Symbol) extends 
ImplicitExpressionOperations {
+    def expr = UnresolvedFieldReference(s.name)
+  }
+
+  implicit class LiteralLongExpression(l: Long) extends 
ImplicitExpressionOperations {
+    def expr = Literal(l)
+  }
+
+  implicit class LiteralIntExpression(i: Int) extends 
ImplicitExpressionOperations {
+    def expr = Literal(i)
+  }
+
+  implicit class LiteralFloatExpression(f: Float) extends 
ImplicitExpressionOperations {
+    def expr = Literal(f)
+  }
+
+  implicit class LiteralDoubleExpression(d: Double) extends 
ImplicitExpressionOperations {
+    def expr = Literal(d)
+  }
+
+  implicit class LiteralStringExpression(str: String) extends 
ImplicitExpressionOperations {
+    def expr = Literal(str)
+  }
+
+  implicit class LiteralBooleanExpression(bool: Boolean) extends 
ImplicitExpressionOperations {
+    def expr = Literal(bool)
+  }
+
+  implicit def symbol2FieldExpression(sym: Symbol): Expression = 
UnresolvedFieldReference(sym.name)
+  implicit def int2Literal(i: Int): Expression = Literal(i)
+  implicit def long2Literal(l: Long): Expression = Literal(l)
+  implicit def double2Literal(d: Double): Expression = Literal(d)
+  implicit def float2Literal(d: Float): Expression = Literal(d)
+  implicit def string2Literal(str: String): Expression = Literal(str)
+  implicit def boolean2Literal(bool: Boolean): Expression = Literal(bool)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
new file mode 100644
index 0000000..e74651b
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.{Row, Table}
+import org.apache.flink.streaming.api.scala.DataStream
+
+import scala.language.implicitConversions
+
+/**
+ * == Table API (Scala) ==
+ *
+ * Importing this package with:
+ *
+ * {{{
+ *   import org.apache.flink.api.scala.table._
+ * }}}
+ *
+ * imports implicit conversions for converting a [[DataSet]] or [[DataStream]] 
to a
+ * [[Table]]. This can be used to perform SQL-like queries on data. Please have
+ * a look at [[Table]] to see which operations are supported and
+ * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]] to see 
how an
+ * expression can be specified.
+ *
+ * When writing a query you can use Scala Symbols to refer to field names. One 
would
+ * refer to field `a` by writing `'a`. Sometimes it is necessary to manually 
confert a
+ * Scala literal to an Expression Literal, in those cases use `Literal`, as in 
`Literal(3)`.
+ *
+ * Example:
+ *
+ * {{{
+ *   import org.apache.flink.api.scala._
+ *   import org.apache.flink.api.scala.table._
+ *
+ *   val env = ExecutionEnvironment.getExecutionEnvironment
+ *   val input = env.fromElements(("Hello", 2), ("Hello", 5), ("Ciao", 3))
+ *   val result = input.as('word, 'count).groupBy('word).select('word, 
'count.avg)
+ *   result.print()
+ *
+ *   env.execute()
+ * }}}
+ *
+ * A [[Table]] can be converted back to the underlying API
+ * representation using `as`:
+ *
+ * {{{
+ *   case class Word(word: String, count: Int)
+ *
+ *   val result = in.select(...).as('word, 'count)
+ *   val set = result.as[Word]
+ * }}}
+ */
+package object table extends ImplicitExpressionConversions {
+
+  implicit def table2TableConversions(table: Table): TableConversions = {
+    new TableConversions(table)
+  }
+
+  implicit def dataSet2DataSetConversions[T](set: DataSet[T]): 
DataSetConversions[T] = {
+    new DataSetConversions[T](set, set.getType.asInstanceOf[CompositeType[T]])
+  }
+
+  implicit def table2RowDataSet(
+      table: Table): DataSet[Row] = {
+    new ScalaBatchTranslator().translate[Row](table.operation)
+  }
+
+  implicit def rowDataSet2Table(
+      rowDataSet: DataSet[Row]): Table = {
+    rowDataSet.toTable
+  }
+
+  implicit def dataStream2DataSetConversions[T](
+      stream: DataStream[T]): DataStreamConversions[T] = {
+    new DataStreamConversions[T](
+      stream,
+      stream.getJavaStream.getType.asInstanceOf[CompositeType[T]])
+  }
+
+  implicit def table2RowDataStream(
+      table: Table): DataStream[Row] = {
+    new ScalaStreamingTranslator().translate[Row](table.operation)
+  }
+
+  implicit def rowDataStream2Table(
+      rowDataStream: DataStream[Row]): Table = {
+    rowDataStream.toTable
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala
new file mode 100644
index 0000000..51c0a4d
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+/**
+ * Exception for all errors occurring during expression evaluation.
+ */
+class ExpressionException(msg: String) extends RuntimeException(msg)

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala
new file mode 100644
index 0000000..e3baab3
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+/**
+ * This is used for executing Table API operations. We use manually generated
+ * TypeInfo to check the field types and create serializers and comparators.
+ */
+class Row(arity: Int) extends Product {
+
+  private val fields = new Array[Any](arity)
+
+  def productArity = fields.length
+
+  def productElement(i: Int): Any = fields(i)
+
+  def setField(i: Int, value: Any): Unit = fields(i) = value
+
+  def canEqual(that: Any) = false
+
+  override def toString = fields.mkString(",")
+
+}

Reply via email to