[FLINK-947] Add parser to Expression API for exposing it to Java

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7d9b639
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7d9b639
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7d9b639

Branch: refs/heads/master
Commit: d7d9b639025a74b322b51e5a35f656c57fa28fc6
Parents: 659ddc0
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Fri Mar 6 11:53:48 2015 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Sun Mar 29 12:27:53 2015 +0200

----------------------------------------------------------------------
 .../api/common/typeutils/CompositeType.java     |  12 ++
 .../api/java/typeutils/TupleTypeInfoBase.java   |   5 +
 .../api/java/expressions/package-info.java      |  22 ++
 .../examples/java/JavaExpressionExample.java    |  27 +--
 .../api/expressions/ExpressionOperation.scala   |  79 ++++++-
 .../analysis/PredicateAnalyzer.scala            |   2 +-
 .../analysis/VerifyNoAggregates.scala           |  51 +++++
 .../expressions/parser/ExpressionParser.scala   | 209 ++++++++++++++++++
 .../api/java/expressions/ExpressionUtil.scala   | 112 ++++++++++
 .../scala/expressions/JavaBatchTranslator.scala |  40 +++-
 .../expressions/JavaStreamingTranslator.scala   |  27 ++-
 .../api/scala/expressions/expressionDsl.scala   |   7 +
 .../expressions/test/AggregationsITCase.java    | 210 +++++++++++++++++++
 .../api/java/expressions/test/AsITCase.java     | 158 ++++++++++++++
 .../java/expressions/test/CastingITCase.java    | 130 ++++++++++++
 .../expressions/test/ExpressionsITCase.java     | 192 +++++++++++++++++
 .../api/java/expressions/test/FilterITCase.java | 130 ++++++++++++
 .../test/GroupedAggregationsITCase.java         | 126 +++++++++++
 .../api/java/expressions/test/JoinITCase.java   | 202 ++++++++++++++++++
 .../api/java/expressions/test/SelectITCase.java | 169 +++++++++++++++
 .../test/StringExpressionsITCase.java           | 144 +++++++++++++
 .../test/PageRankExpressionITCase.java          | 100 +++++++++
 .../scala/expressions/AggregationsITCase.scala  | 126 -----------
 .../flink/api/scala/expressions/AsITCase.scala  | 126 -----------
 .../api/scala/expressions/CastingITCase.scala   |  93 --------
 .../scala/expressions/ExpressionsITCase.scala   | 126 -----------
 .../api/scala/expressions/FilterITCase.scala    | 150 -------------
 .../GroupedAggreagationsITCase.scala            |  99 ---------
 .../api/scala/expressions/JoinITCase.scala      | 132 ------------
 .../expressions/PageRankExpressionITCase.java   | 100 ---------
 .../api/scala/expressions/SelectITCase.scala    | 130 ------------
 .../expressions/StringExpressionsITCase.scala   |  97 ---------
 .../expressions/test/AggregationsITCase.scala   | 127 +++++++++++
 .../api/scala/expressions/test/AsITCase.scala   | 124 +++++++++++
 .../scala/expressions/test/CastingITCase.scala  |  92 ++++++++
 .../expressions/test/ExpressionsITCase.scala    | 127 +++++++++++
 .../scala/expressions/test/FilterITCase.scala   | 151 +++++++++++++
 .../test/GroupedAggreagationsITCase.scala       |  96 +++++++++
 .../api/scala/expressions/test/JoinITCase.scala | 145 +++++++++++++
 .../scala/expressions/test/SelectITCase.scala   | 143 +++++++++++++
 .../test/StringExpressionsITCase.scala          |  98 +++++++++
 41 files changed, 3233 insertions(+), 1203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
index 36778d6..54a1e13 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
@@ -176,6 +176,18 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
        public abstract String[] getFieldNames();
 
        /**
+        * True if this type has an inherent ordering of the fields, such that 
a user can
+        * always be sure in which order the fields will be in. This is true 
for Tuples and
+        * Case Classes. It is not true for Regular Java Objects, since there, 
the ordering of
+        * the fields can be arbitrary.
+        *
+        * This is used when translating a DataSet or DataStream to an 
Expression Table, when
+        * initially renaming the fields of the underlying type.
+        */
+       public boolean hasDeterministicFieldOrder() {
+               return false;
+       }
+       /**
         * Returns the field index of the composite field of the given name.
         *
         * @return The field index or -1 if this type does not have a field of 
the given name.

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index ce075a7..d1c2c9d 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -272,4 +272,9 @@ public abstract class TupleTypeInfoBase<T> extends 
CompositeType<T> {
                bld.append('>');
                return bld.toString();
        }
+
+       @Override
+       public boolean hasDeterministicFieldOrder() {
+               return true;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/main/java/org/apache/flink/api/java/expressions/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/java/org/apache/flink/api/java/expressions/package-info.java
 
b/flink-staging/flink-expressions/src/main/java/org/apache/flink/api/java/expressions/package-info.java
new file mode 100644
index 0000000..07e18b2
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/java/org/apache/flink/api/java/expressions/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package doc wohoooo
+ */
+package org.apache.flink.api.java.expressions;

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java
 
b/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java
index 055eaee..42632f9 100644
--- 
a/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java
+++ 
b/flink-staging/flink-expressions/src/main/java/org/apache/flink/examples/java/JavaExpressionExample.java
@@ -19,20 +19,12 @@ package org.apache.flink.examples.java;
 
 
 import org.apache.flink.api.expressions.ExpressionOperation;
-import org.apache.flink.api.expressions.tree.EqualTo$;
-import org.apache.flink.api.expressions.tree.Expression;
-import org.apache.flink.api.expressions.tree.Literal$;
-import org.apache.flink.api.expressions.tree.UnresolvedFieldReference$;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
+import org.apache.flink.api.java.expressions.ExpressionUtil;
 
 /**
- * This is extremely bare-bones. We need a parser that can parse expressions 
in a String
- * and create the correct expression AST. Then we can use expressions like 
this:
- *
- * {@code in.select("'field0.avg, 'field1.count") }
+ * Very simple example that shows how the Java Expression API can be used.
  */
 public class JavaExpressionExample {
 
@@ -60,17 +52,16 @@ public class JavaExpressionExample {
                DataSet<WC> input = env.fromElements(
                                new WC("Hello", 1),
                                new WC("Ciao", 1),
-                               new WC("Hello", 1)
-               );
+                               new WC("Hello", 1));
 
-               ExpressionOperation<JavaBatchTranslator> expr = new 
JavaBatchTranslator().createExpressionOperation(
-                               input,
-                               new Expression[] { 
UnresolvedFieldReference$.MODULE$.apply("count"), 
UnresolvedFieldReference$.MODULE$.apply("word")});
+               ExpressionOperation expr = ExpressionUtil.from(input);
 
-               ExpressionOperation<JavaBatchTranslator> filtered = expr.filter(
-                               
EqualTo$.MODULE$.apply(UnresolvedFieldReference$.MODULE$.apply("word"), 
Literal$.MODULE$.apply("Hello")));
+               ExpressionOperation filtered = expr
+                               .groupBy("word")
+                               .select("word.count as count, word")
+                               .filter("count = 2");
 
-               DataSet<WC> result = (DataSet<WC>) 
filtered.as(TypeExtractor.createTypeInfo(WC.class));
+               DataSet<WC> result = ExpressionUtil.toSet(filtered, WC.class);
 
                result.print();
                env.execute();

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala
index d843c5b..38417b2 100644
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/ExpressionOperation.scala
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.expressions.analysis.{GroupByAnalyzer, 
SelectionAnalyzer,
 PredicateAnalyzer}
 import org.apache.flink.api.expressions.operations._
+import org.apache.flink.api.expressions.parser.ExpressionParser
 import org.apache.flink.api.expressions.tree.{ResolvedFieldReference,
 UnresolvedFieldReference, Expression}
 
@@ -67,6 +68,21 @@ case class ExpressionOperation[A <: OperationTranslator](
   }
 
   /**
+   * Performs a selection operation. Similar to an SQL SELECT statement. The 
field expressions
+   * can contain complex expressions and aggregations.
+   *
+   * Example:
+   *
+   * {{{
+   *   in.select("key, value.avg + " The average" as average, 
other.substring(0, 10)")
+   * }}}
+   */
+  def select(fields: String): ExpressionOperation[A] = {
+    val fieldExprs = ExpressionParser.parseExpressionList(fields)
+    select(fieldExprs: _*)
+  }
+
+  /**
    * Renames the fields of the expression result. Use this to disambiguate 
fields before
    * joining to operations.
    *
@@ -84,7 +100,21 @@ case class ExpressionOperation[A <: OperationTranslator](
       case false => throw new ExpressionException("Only field expression 
allowed in as().")
     }
     this.copy(operation = As(operation, fields.toArray map { _.name }))
+  }
 
+  /**
+   * Renames the fields of the expression result. Use this to disambiguate 
fields before
+   * joining to operations.
+   *
+   * Example:
+   *
+   * {{{
+   *   in.as("a, b")
+   * }}}
+   */
+  def as(fields: String): ExpressionOperation[A] = {
+    val fieldExprs = ExpressionParser.parseExpressionList(fields)
+    as(fieldExprs: _*)
   }
 
   /**
@@ -110,7 +140,22 @@ case class ExpressionOperation[A <: OperationTranslator](
    * Example:
    *
    * {{{
-   *   in.filter('name === "Fred")
+   *   in.filter("name === 'Fred'")
+   * }}}
+   */
+  def filter(predicate: String): ExpressionOperation[A] = {
+    val predicateExpr = ExpressionParser.parseExpression(predicate)
+    filter(predicateExpr)
+  }
+
+  /**
+   * Filters out elements that don't pass the filter predicate. Similar to a 
SQL WHERE
+   * clause.
+   *
+   * Example:
+   *
+   * {{{
+   *   in.filter(name === "Fred")
    * }}}
    */
   def where(predicate: Expression): ExpressionOperation[A] = {
@@ -118,6 +163,20 @@ case class ExpressionOperation[A <: OperationTranslator](
   }
 
   /**
+   * Filters out elements that don't pass the filter predicate. Similar to a 
SQL WHERE
+   * clause.
+   *
+   * Example:
+   *
+   * {{{
+   *   in.filter("name === 'Fred'")
+   * }}}
+   */
+  def where(predicate: String): ExpressionOperation[A] = {
+    filter(predicate)
+  }
+
+  /**
    * Groups the elements on some grouping keys. Use this before a selection 
with aggregations
    * to perform the aggregation on a per-group basis. Similar to a SQL GROUP 
BY statement.
    *
@@ -144,8 +203,24 @@ case class ExpressionOperation[A <: OperationTranslator](
   }
 
   /**
+   * Groups the elements on some grouping keys. Use this before a selection 
with aggregations
+   * to perform the aggregation on a per-group basis. Similar to a SQL GROUP 
BY statement.
+   *
+   * Example:
+   *
+   * {{{
+   *   in.groupBy("key").select("key, value.avg")
+   * }}}
+   */
+  def groupBy(fields: String): ExpressionOperation[A] = {
+    val fieldsExpr = ExpressionParser.parseExpressionList(fields)
+    groupBy(fieldsExpr: _*)
+  }
+
+  /**
    * Joins to expression operations. Similar to an SQL join. The fields of the 
two joined
-   * operations must not overlap, use [[as]] to rename fields if necessary.
+   * operations must not overlap, use [[as]] to rename fields if necessary. 
You can use
+   * where and select clauses after a join to further specify the behaviour of 
the join.
    *
    * Example:
    *

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala
index 2531fff..f108f5c 100644
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/PredicateAnalyzer.scala
@@ -18,7 +18,6 @@
 package org.apache.flink.api.expressions.analysis
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
 
 /**
  * Analyzer for unary predicates, i.e. filter operations.
@@ -28,5 +27,6 @@ class PredicateAnalyzer(inputFields: Seq[(String, 
TypeInformation[_])]) extends
     new ResolveFieldReferences(inputFields),
     new InsertAutoCasts,
     new TypeCheck,
+    new VerifyNoAggregates,
     new VerifyBoolean)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoAggregates.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoAggregates.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoAggregates.scala
new file mode 100644
index 0000000..e9f8788
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/analysis/VerifyNoAggregates.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.expressions.analysis
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.expressions.tree.{Aggregation, Expression}
+
+import scala.collection.mutable
+
+/**
+ * Rule that verifies that an expression does not contain aggregate 
operations. Right now, join
+ * predicates and filter predicates cannot contain aggregates.
+ */
+class VerifyNoAggregates extends Rule {
+
+  def apply(expr: Expression) = {
+    val errors = mutable.MutableList[String]()
+
+    val result = expr.transformPre {
+      case agg: Aggregation=> {
+        errors +=
+          s"""Aggregations are not allowed in join/filter predicates."""
+        agg
+      }
+    }
+
+    if (errors.length > 0) {
+      throw new ExpressionException(
+        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
+    }
+
+    result
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/parser/ExpressionParser.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/parser/ExpressionParser.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/parser/ExpressionParser.scala
new file mode 100644
index 0000000..da53ded
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/parser/ExpressionParser.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.expressions.parser
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.expressions.operations.As
+import org.apache.flink.api.expressions.tree._
+
+import scala.util.parsing.combinator.{PackratParsers, JavaTokenParsers}
+
+/**
+ * Parser for expressions inside a String. This parses exactly the same 
expressions that
+ * would be accepted by the Scala Expression DSL.
+ *
+ * See 
[[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and
+ * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]] for 
the constructs
+ * available in the Scala Expression DSL. This parser must be kept in sync 
with the Scala DSL
+ * lazy valined in the above files.
+ */
+object ExpressionParser extends JavaTokenParsers with PackratParsers {
+
+  // Literals
+
+  lazy val numberLiteral: PackratParser[Expression] =
+    ((wholeNumber <~ ("L" | "l")) | floatingPointNumber | decimalNumber | 
wholeNumber) ^^ {
+      str =>
+        if (str.endsWith("L") || str.endsWith("l")) {
+          Literal(str.toLong)
+        } else if (str.matches("""-?\d+""")) {
+          Literal(str.toInt)
+        } else if (str.endsWith("f") | str.endsWith("F")) {
+          Literal(str.toFloat)
+        } else {
+          Literal(str.toDouble)
+        }
+    }
+
+  lazy val singleQuoteStringLiteral: Parser[Expression] =
+    ("'" + """([^'\p{Cntrl}\\]|\\[\\'"bfnrt]|\\u[a-fA-F0-9]{4})*""" + "'").r 
^^ {
+      str => Literal(str.substring(1, str.length - 1))
+    }
+
+  lazy val stringLiteralFlink: PackratParser[Expression] = super.stringLiteral 
^^ {
+    str => Literal(str.substring(1, str.length - 1))
+  }
+
+  lazy val boolLiteral: PackratParser[Expression] = ("true" | "false") ^^ {
+    str => Literal(str.toBoolean)
+  }
+
+  lazy val literalExpr: PackratParser[Expression] =
+    numberLiteral |
+      stringLiteralFlink | singleQuoteStringLiteral |
+      boolLiteral
+
+  lazy val fieldReference: PackratParser[Expression] = ident ^^ {
+    case sym => UnresolvedFieldReference(sym)
+  }
+
+  lazy val atom: PackratParser[Expression] =
+    ( "(" ~> expression <~ ")" ) | literalExpr | fieldReference
+
+  // suffix ops
+  lazy val isNull: PackratParser[Expression] = atom <~ ".isNull" ^^ { e => 
IsNull(e) }
+  lazy val isNotNull: PackratParser[Expression] = atom <~ ".isNotNull" ^^ { e 
=> IsNotNull(e) }
+
+  lazy val abs: PackratParser[Expression] = atom <~ ".abs" ^^ { e => Abs(e) }
+
+  lazy val sum: PackratParser[Expression] = atom <~ ".sum" ^^ { e => Sum(e) }
+  lazy val min: PackratParser[Expression] = atom <~ ".min" ^^ { e => Min(e) }
+  lazy val max: PackratParser[Expression] = atom <~ ".max" ^^ { e => Max(e) }
+  lazy val count: PackratParser[Expression] = atom <~ ".count" ^^ { e => 
Count(e) }
+  lazy val avg: PackratParser[Expression] = atom <~ ".avg" ^^ { e => Avg(e) }
+
+  lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ 
")" ^^ {
+    case e ~ _ ~ as ~ _ => Naming(e, as.name)
+  }
+
+  lazy val substring: PackratParser[Expression] =
+    atom ~ ".substring(" ~ expression ~ "," ~ expression ~ ")" ^^ {
+      case e ~ _ ~ from ~ _ ~ to ~ _ => Substring(e, from, to)
+
+    }
+
+  lazy val substringWithoutEnd: PackratParser[Expression] =
+    atom ~ ".substring(" ~ expression ~ ")" ^^ {
+      case e ~ _ ~ from ~ _ => Substring(e, from, Literal(Integer.MAX_VALUE))
+
+    }
+
+  lazy val suffix =
+    isNull | isNotNull |
+      abs | sum | min | max | count | avg |
+      substring | substringWithoutEnd | atom
+
+
+  // unary ops
+
+  lazy val unaryNot: PackratParser[Expression] = "!" ~> suffix ^^ { e => 
Not(e) }
+
+  lazy val unaryMinus: PackratParser[Expression] = "-" ~> suffix ^^ { e => 
UnaryMinus(e) }
+
+  lazy val unaryBitwiseNot: PackratParser[Expression] = "~" ~> suffix ^^ { e 
=> BitwiseNot(e) }
+
+  lazy val unary = unaryNot | unaryMinus | unaryBitwiseNot | suffix
+
+  // binary bitwise opts
+
+  lazy val binaryBitwise = unary * (
+    "&" ^^^ { (a:Expression, b:Expression) => BitwiseAnd(a,b) } |
+      "|" ^^^ { (a:Expression, b:Expression) => BitwiseOr(a,b) } |
+      "^" ^^^ { (a:Expression, b:Expression) => BitwiseXor(a,b) } )
+
+  // arithmetic
+
+  lazy val product = binaryBitwise * (
+    "*" ^^^ { (a:Expression, b:Expression) => Mul(a,b) } |
+      "/" ^^^ { (a:Expression, b:Expression) => Div(a,b) } |
+      "%" ^^^ { (a:Expression, b:Expression) => Mod(a,b) } )
+
+  lazy val term = product * (
+    "+" ^^^ { (a:Expression, b:Expression) => Plus(a,b) } |
+     "-" ^^^ { (a:Expression, b:Expression) => Minus(a,b) } )
+
+  // Comparison
+
+  lazy val equalTo: PackratParser[Expression] = term ~ "===" ~ term ^^ {
+    case l ~ _ ~ r => EqualTo(l, r)
+  }
+
+  lazy val equalToAlt: PackratParser[Expression] = term ~ "=" ~ term ^^ {
+    case l ~ _ ~ r => EqualTo(l, r)
+  }
+
+  lazy val notEqualTo: PackratParser[Expression] = term ~ "!==" ~ term ^^ {
+    case l ~ _ ~ r => NotEqualTo(l, r)
+  }
+
+  lazy val greaterThan: PackratParser[Expression] = term ~ ">" ~ term ^^ {
+    case l ~ _ ~ r => GreaterThan(l, r)
+  }
+
+  lazy val greaterThanOrEqual: PackratParser[Expression] = term ~ ">=" ~ term 
^^ {
+    case l ~ _ ~ r => GreaterThanOrEqual(l, r)
+  }
+
+  lazy val lessThan: PackratParser[Expression] = term ~ "<" ~ term ^^ {
+    case l ~ _ ~ r => LessThan(l, r)
+  }
+
+  lazy val lessThanOrEqual: PackratParser[Expression] = term ~ "<=" ~ term ^^ {
+    case l ~ _ ~ r => LessThanOrEqual(l, r)
+  }
+
+  lazy val comparison: PackratParser[Expression] =
+      equalTo | equalToAlt | notEqualTo |
+      greaterThan | greaterThanOrEqual |
+      lessThan | lessThanOrEqual | term
+
+  // logic
+
+  lazy val logic = comparison * (
+    "&&" ^^^ { (a:Expression, b:Expression) => And(a,b) } |
+      "||" ^^^ { (a:Expression, b:Expression) => Or(a,b) } )
+
+  // alias
+
+  lazy val alias: PackratParser[Expression] = logic ~ "as" ~ fieldReference ^^ 
{
+    case e ~ _ ~ name => Naming(e, name.name)
+  } | logic
+
+  lazy val expression: PackratParser[Expression] = alias
+
+  lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",")
+
+  def parseExpressionList(expression: String): List[Expression] = {
+    parseAll(expressionList, expression) match {
+      case Success(lst, _) => lst
+
+      case Failure(msg, _) => throw new ExpressionException("Could not parse 
expression: " + msg)
+
+      case Error(msg, _) => throw new ExpressionException("Could not parse 
expression: " + msg)
+    }
+  }
+
+  def parseExpression(exprString: String): Expression = {
+    parseAll(expression, exprString) match {
+      case Success(lst, _) => lst
+
+      case fail =>
+        throw new ExpressionException("Could not parse expression: " + 
fail.toString)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/java/expressions/ExpressionUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/java/expressions/ExpressionUtil.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/java/expressions/ExpressionUtil.scala
new file mode 100644
index 0000000..ad7cfe4
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/java/expressions/ExpressionUtil.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.expressions
+
+import org.apache.flink.api.expressions.ExpressionOperation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.scala.expressions.JavaBatchTranslator
+import org.apache.flink.api.scala.expressions.JavaStreamingTranslator
+import org.apache.flink.streaming.api.datastream.DataStream
+
+/**
+ * Convencience methods for creating an 
[[org.apache.flink.api.expressions.ExpressionOperation]]
+ * and for converting an 
[[org.apache.flink.api.expressions.ExpressionOperation]] back
+ * to a [[org.apache.flink.api.java.DataSet]] or
+ * [[org.apache.flink.streaming.api.datastream.DataStream]].
+ */
+object ExpressionUtil {
+  
+  /**
+   * Transforms the given DataSet to a 
[[org.apache.flink.api.expressions.ExpressionOperation]].
+   * The fields of the DataSet type are renamed to the given set of fields:
+   *
+   * Example:
+   *
+   * {{{
+   *   ExpressionUtil.from(set, "a, b")
+   * }}}
+   *
+   * This will transform the set containing elements of two fields to a table 
where the fields
+   * are named a and b.
+   */
+  def from[T](set: DataSet[T], fields: String): 
ExpressionOperation[JavaBatchTranslator] = {
+    new JavaBatchTranslator().createExpressionOperation(set, fields)
+  }
+
+  /**
+   * Transforms the given DataSet to a 
[[org.apache.flink.api.expressions.ExpressionOperation]].
+   * The fields of the DataSet type are used to name the
+   * [[org.apache.flink.api.expressions.ExpressionOperation]] fields.
+   */
+  def from[T](set: DataSet[T]): ExpressionOperation[JavaBatchTranslator] = {
+    new JavaBatchTranslator().createExpressionOperation(set)
+  }
+
+  /**
+   * Transforms the given DataStream to a 
[[org.apache.flink.api.expressions.ExpressionOperation]].
+   * The fields of the DataSet type are renamed to the given set of fields:
+   *
+   * Example:
+   *
+   * {{{
+   *   ExpressionUtil.from(set, "a, b")
+   * }}}
+   *
+   * This will transform the set containing elements of two fields to a table 
where the fields
+   * are named a and b.
+   */
+  def from[T](set: DataStream[T], fields: String): 
ExpressionOperation[JavaStreamingTranslator] = {
+    new JavaStreamingTranslator().createExpressionOperation(set, fields)
+  }
+
+  /**
+   * Transforms the given DataSet to a 
[[org.apache.flink.api.expressions.ExpressionOperation]].
+   * The fields of the DataSet type are used to name the
+   * [[org.apache.flink.api.expressions.ExpressionOperation]] fields.
+   */
+  def from[T](set: DataStream[T]): 
ExpressionOperation[JavaStreamingTranslator] = {
+    new JavaStreamingTranslator().createExpressionOperation(set)
+  }
+
+  /**
+   * Converts the given 
[[org.apache.flink.api.expressions.ExpressionOperation]] to
+   * a DataSet. The given type must have exactly the same fields as the
+   * [[org.apache.flink.api.expressions.ExpressionOperation]]. That is, the 
names of the
+   * fields and the types must match.
+   */
+  @SuppressWarnings(Array("unchecked"))
+  def toSet[T](
+      op: ExpressionOperation[JavaBatchTranslator],
+      clazz: Class[T]): DataSet[T] = {
+    op.as(TypeExtractor.createTypeInfo(clazz)).asInstanceOf[DataSet[T]]
+  }
+
+  /**
+   * Converts the given 
[[org.apache.flink.api.expressions.ExpressionOperation]] to
+   * a DataStream. The given type must have exactly the same fields as the
+   * [[org.apache.flink.api.expressions.ExpressionOperation]]. That is, the 
names of the
+   * fields and the types must match.
+   */
+  @SuppressWarnings(Array("unchecked"))
+  def toStream[T](
+      op: ExpressionOperation[JavaStreamingTranslator], clazz: Class[T]): 
DataStream[T] = {
+    op.as(TypeExtractor.createTypeInfo(clazz)).asInstanceOf[DataStream[T]]
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala
index 037efd4..ae41ceb 100644
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.expressions.analysis.ExtractEquiJoinFields
 import org.apache.flink.api.expressions.operations._
+import org.apache.flink.api.expressions.parser.ExpressionParser
 import org.apache.flink.api.expressions.runtime.{ExpressionAggregateFunction, 
ExpressionFilterFunction, ExpressionJoinFunction, ExpressionSelectFunction}
 import org.apache.flink.api.expressions.tree._
 import org.apache.flink.api.expressions.typeinfo.{RenameOperator, 
RenamingProxyTypeInfo, RowTypeInfo}
@@ -43,9 +44,27 @@ class JavaBatchTranslator extends OperationTranslator {
 
   type Representation[A] = JavaDataSet[A]
 
+
+  def createExpressionOperation[A](
+    repr: JavaDataSet[A]): ExpressionOperation[JavaBatchTranslator] = {
+    val fields =
+      
repr.getType.asInstanceOf[CompositeType[A]].getFieldNames.map(UnresolvedFieldReference)
+
+    createExpressionOperation(repr, 
fields.toArray.asInstanceOf[Array[Expression]], false)
+  }
+
+  def createExpressionOperation[A](
+    repr: JavaDataSet[A],
+    expression: String): ExpressionOperation[JavaBatchTranslator] = {
+    val fields = ExpressionParser.parseExpressionList(expression)
+
+    createExpressionOperation(repr, fields.toArray)
+  }
+
   def createExpressionOperation[A](
       repr: JavaDataSet[A],
-      fields: Array[Expression]): ExpressionOperation[JavaBatchTranslator] = {
+      fields: Array[Expression],
+      checkDeterministicFields: Boolean = true): 
ExpressionOperation[JavaBatchTranslator] = {
 
     // shortcut for DataSet[Row]
     repr.getType match {
@@ -72,6 +91,11 @@ class JavaBatchTranslator extends OperationTranslator {
 
     val inputType = repr.getType.asInstanceOf[CompositeType[A]]
 
+    if (!inputType.hasDeterministicFieldOrder && checkDeterministicFields) {
+      throw new ExpressionException(s"You cannot rename fields upon Table 
creaton: " +
+          s"Field order of input type $inputType is not deterministic." )
+    }
+
     if (fields.length != inputType.getFieldNames.length) {
       throw new ExpressionException("Number of selected fields: '" + 
fields.mkString(",") +
         "' and number of fields in input type " + inputType + " do not match.")
@@ -80,7 +104,7 @@ class JavaBatchTranslator extends OperationTranslator {
     val newFieldNames = fields map {
       case UnresolvedFieldReference(name) => name
       case e =>
-        throw new ExpressionException("Only field expressions allowed in 'as' 
operation, " +
+        throw new ExpressionException("Only field references allowed in 'as' 
operation, " +
           " offending expression: " + e)
     }
 
@@ -167,6 +191,13 @@ class JavaBatchTranslator extends OperationTranslator {
       case Root(dataSet: JavaDataSet[Row], resultFields) =>
         dataSet
 
+      case Root(_, _) =>
+        throw new ExpressionException("Invalid Root for JavaBatchTranslator: " 
+ op)
+
+      case GroupBy(_, fields) =>
+        throw new ExpressionException("Dangling GroupBy operation. Did you 
forget a " +
+          "SELECT statement?")
+
       case As(input, newNames) =>
         val translatedInput = translateInternal(input)
         val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
@@ -333,6 +364,11 @@ class JavaBatchTranslator extends OperationTranslator {
     val (reducedPredicate, leftFields, rightFields) =
       ExtractEquiJoinFields(leftType, rightType, predicate)
 
+    if (leftFields.isEmpty || rightFields.isEmpty) {
+      throw new ExpressionException("Could not derive equi-join predicates " +
+        "for predicate " + predicate + ".")
+    }
+
     val leftKey = new ExpressionKeys[L](leftFields, leftType)
     val rightKey = new ExpressionKeys[R](rightFields, rightType)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala
index 095823e..56c38af 100644
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala
@@ -24,6 +24,7 @@ import 
org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.expressions.operations._
+import org.apache.flink.api.expressions.parser.ExpressionParser
 import org.apache.flink.api.expressions.runtime.{ExpressionFilterFunction, 
ExpressionSelectFunction}
 import org.apache.flink.api.expressions.tree._
 import org.apache.flink.api.expressions.typeinfo.RowTypeInfo
@@ -43,6 +44,23 @@ class JavaStreamingTranslator extends OperationTranslator {
 
   type Representation[A] = DataStream[A]
 
+
+  def createExpressionOperation[A](
+    repr: DataStream[A]): ExpressionOperation[JavaStreamingTranslator] = {
+    val fields =
+      
repr.getType.asInstanceOf[CompositeType[A]].getFieldNames.map(UnresolvedFieldReference)
+
+    createExpressionOperation(repr, 
fields.toArray.asInstanceOf[Array[Expression]])
+  }
+
+  def createExpressionOperation[A](
+      repr: DataStream[A],
+      expression: String): ExpressionOperation[JavaStreamingTranslator] = {
+    val fields = ExpressionParser.parseExpressionList(expression)
+
+    createExpressionOperation(repr, fields.toArray)
+  }
+
   def createExpressionOperation[A](
       repr: DataStream[A],
       fields: Array[Expression]): ExpressionOperation[JavaStreamingTranslator] 
= {
@@ -80,7 +98,7 @@ class JavaStreamingTranslator extends OperationTranslator {
     val newFieldNames = fields map {
       case UnresolvedFieldReference(name) => name
       case e =>
-        throw new ExpressionException("Only field expressions allowed in 'as' 
operation, " +
+        throw new ExpressionException("Only field references allowed in 'as' 
operation, " +
           " offending expression: " + e)
     }
 
@@ -166,6 +184,13 @@ class JavaStreamingTranslator extends OperationTranslator {
       case Root(dataSet: DataStream[Row], resultFields) =>
         dataSet
 
+      case Root(_, _) =>
+        throw new ExpressionException("Invalid Root for 
JavaStreamingTranslator: " + op)
+
+      case GroupBy(_, fields) =>
+        throw new ExpressionException("Dangling GroupBy operation. Did you 
forget a " +
+          "SELECT statement?")
+
       case As(input, newNames) =>
         throw new ExpressionException("As operation for Streams not yet 
implemented.")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala
index ef25b5b..1f6c397 100644
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala
+++ 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala
@@ -25,6 +25,9 @@ import scala.language.implicitConversions
 /**
  * These are all the operations that can be used to construct an 
[[Expression]] AST for expression
  * operations.
+ *
+ * These operations must be kept in sync with the parser in
+ * [[org.apache.flink.api.expressions.parser.ExpressionParser]].
  */
 trait ImplicitExpressionOperations {
   def expr: Expression
@@ -87,6 +90,10 @@ trait ImplicitExpressionConversions {
     def expr = UnresolvedFieldReference(s.name)
   }
 
+  implicit class LiteralLongExpression(l: Long) extends 
ImplicitExpressionOperations {
+    def expr = Literal(l)
+  }
+
   implicit class LiteralIntExpression(i: Int) extends 
ImplicitExpressionOperations {
     def expr = Literal(i)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AggregationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AggregationsITCase.java
 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AggregationsITCase.java
new file mode 100644
index 0000000..76c7fed
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AggregationsITCase.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.expressions.test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.expressions.ExpressionException;
+import org.apache.flink.api.expressions.ExpressionOperation;
+import org.apache.flink.api.expressions.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.expressions.ExpressionUtil;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class AggregationsITCase extends MultipleProgramsTestBase {
+
+
+       public AggregationsITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       private String resultPath;
+       private String expected = "";
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testAggregationTypes() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               
ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env));
+
+               ExpressionOperation<JavaBatchTranslator> result =
+                               expressionOperation.select("f0.sum, f0.min, 
f0.max, f0.count, f0.avg");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "231,1,21,21,11";
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testAggregationOnNonExistingField() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               
ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env));
+
+               ExpressionOperation<JavaBatchTranslator> result =
+                               expressionOperation.select("'foo.avg");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "";
+       }
+
+       @Test
+       public void testWorkingAggregationDataTypes() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, 
String>> input =
+                               env.fromElements(
+                                               new Tuple7<Byte, Short, 
Integer, Long, Float, Double, String>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, 
"Hello"),
+                                               new Tuple7<Byte, Short, 
Integer, Long, Float, Double, String>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, 
"Ciao"));
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               ExpressionUtil.from(input);
+
+               ExpressionOperation<JavaBatchTranslator> result =
+                               expressionOperation.select("f0.avg, f1.avg, 
f2.avg, f3.avg, f4.avg, f5.avg, f6.count");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "1,1,1,1,1.5,1.5,2";
+       }
+
+       @Test
+       public void testAggregationWithArithmetic() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSource<Tuple2<Float, String>> input =
+                               env.fromElements(
+                                               new Tuple2<Float, String>(1f, 
"Hello"),
+                                               new Tuple2<Float, String>(2f, 
"Ciao"));
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               ExpressionUtil.from(input);
+
+               ExpressionOperation<JavaBatchTranslator> result =
+                               expressionOperation.select("(f0 + 2).avg + 2, 
f1.count + \" THE COUNT\"");
+
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "5.5,2 THE COUNT";
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testNonWorkingDataTypes() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSource<Tuple2<Float, String>> input = env.fromElements(new 
Tuple2<Float, String>(1f,
+                               "Hello"));
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               ExpressionUtil.from(input);
+
+               ExpressionOperation<JavaBatchTranslator> result =
+                               expressionOperation.select("f1.sum");
+
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "";
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testNoNestedAggregation() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSource<Tuple2<Float, String>> input = env.fromElements(new 
Tuple2<Float, String>(1f, "Hello"));
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               ExpressionUtil.from(input);
+
+               ExpressionOperation<JavaBatchTranslator> result =
+                               expressionOperation.select("f0.sum.sum");
+
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "";
+       }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AsITCase.java
 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AsITCase.java
new file mode 100644
index 0000000..3b69be0
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AsITCase.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.expressions.test;
+
+import org.apache.flink.api.expressions.ExpressionException;
+import org.apache.flink.api.expressions.ExpressionOperation;
+import org.apache.flink.api.expressions.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.expressions.ExpressionUtil;
+import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class AsITCase extends MultipleProgramsTestBase {
+
+
+       public AsITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       private String resultPath;
+       private String expected = "";
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testAs() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               
ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a, b, c");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, 
Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + 
"4,3,Hello world, " +
+                               "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
+                               "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+                               "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+                               "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+                               "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testAsWithToFewFields() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               
ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a, b");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, 
Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "";
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testAsWithToManyFields() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               
ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, 
Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "";
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testAsWithAmbiguousFields() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               
ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, 
Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "";
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testAsWithNonFieldReference1() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               
ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, 
Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "";
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testAsWithNonFieldReference2() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               
ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a as foo, b, c");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, 
Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "";
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/CastingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/CastingITCase.java
 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/CastingITCase.java
new file mode 100644
index 0000000..b4d1159
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/CastingITCase.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.expressions.test;
+
+import org.apache.flink.api.expressions.ExpressionOperation;
+import org.apache.flink.api.expressions.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.expressions.ExpressionUtil;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class CastingITCase extends MultipleProgramsTestBase {
+
+
+       public CastingITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       private String resultPath;
+       private String expected = "";
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testAutoCastToString() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, 
String>> input =
+                               env.fromElements(new Tuple7<Byte, Short, 
Integer, Long, Float, Double, String>(
+                                               (byte) 1, (short) 1, 1, 1L, 
1.0f, 1.0d, "Hello"));
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               ExpressionUtil.from(input);
+
+               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation.select(
+                               "f0 + 'b', f1 + 's', f2 + 'i', f3 + 'L', f4 + 
'f', f5 + \"d\"");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "1b,1s,1i,1L,1.0f,1.0d";
+       }
+
+       @Test
+       public void testNumericAutocastInArithmetic() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, 
String>> input =
+                               env.fromElements(new Tuple7<Byte, Short, 
Integer, Long, Float, Double, String>(
+                                               (byte) 1, (short) 1, 1, 1L, 
1.0f, 1.0d, "Hello"));
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               ExpressionUtil.from(input);
+
+               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation.select("f0 + 1, f1 +" +
+                               " 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "2,2,2,2.0,2.0,2.0";
+       }
+
+       @Test
+       public void testNumericAutocastInComparison() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, 
String>> input =
+                               env.fromElements(
+                                               new Tuple7<Byte, Short, 
Integer, Long, Float, Double, String>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, 
"Hello"),
+                                               new Tuple7<Byte, Short, 
Integer, Long, Float, Double, String>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, 
"Hello"));
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               ExpressionUtil.from(input, "a,b,c,d,e,f,g");
+
+               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation
+                               .filter("a > 1 && b > 1 && c > 1L && d > 1.0f 
&& e > 1.0d && f > 1");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "2,2,2,2,2.0,2.0,Hello";
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/ExpressionsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/ExpressionsITCase.java
 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/ExpressionsITCase.java
new file mode 100644
index 0000000..5c3a92a
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/ExpressionsITCase.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.expressions.test;
+
+import org.apache.flink.api.expressions.ExpressionException;
+import org.apache.flink.api.expressions.ExpressionOperation;
+import org.apache.flink.api.expressions.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.expressions.ExpressionUtil;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ExpressionsITCase extends MultipleProgramsTestBase {
+
+
+       public ExpressionsITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       private String resultPath;
+       private String expected = "";
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testArithmetic() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSource<Tuple2<Integer, Integer>> input =
+                               env.fromElements(new Tuple2<Integer, 
Integer>(5, 10));
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               ExpressionUtil.from(input, "a, b");
+
+               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation.select(
+                               "a - 5, a + 5, a / 2, a * 2, a % 2, -a");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "0,10,2,10,1,-5";
+       }
+
+       @Test
+       public void testLogic() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSource<Tuple2<Integer, Boolean>> input =
+                               env.fromElements(new Tuple2<Integer, 
Boolean>(5, true));
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               ExpressionUtil.from(input, "a, b");
+
+               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation.select(
+                               "b && true, b && false, b || false, !b");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "true,false,true,false";
+       }
+
+       @Test
+       public void testComparisons() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSource<Tuple3<Integer, Integer, Integer>> input =
+                               env.fromElements(new Tuple3<Integer, Integer, 
Integer>(5, 5, 4));
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               ExpressionUtil.from(input, "a, b, c");
+
+               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation.select(
+                               "a > c, a >= b, a < c, a.isNull, a.isNotNull");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "true,true,false,false,true";
+       }
+
+       @Test
+       public void testBitwiseOperation() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSource<Tuple2<Byte, Byte>> input =
+                               env.fromElements(new Tuple2<Byte, Byte>((byte) 
3, (byte) 5));
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               ExpressionUtil.from(input, "a, b");
+
+               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation.select(
+                               "a & b, a | b, a ^ b, ~a");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "1,7,6,-4";
+       }
+
+       @Test
+       public void testBitwiseWithAutocast() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSource<Tuple2<Integer, Byte>> input =
+                               env.fromElements(new Tuple2<Integer, Byte>(3, 
(byte) 5));
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               ExpressionUtil.from(input, "a, b");
+
+               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation.select(
+                               "a & b, a | b, a ^ b, ~a");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "1,7,6,-4";
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testBitwiseWithNonWorkingAutocast() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSource<Tuple2<Float, Byte>> input =
+                               env.fromElements(new Tuple2<Float, Byte>(3.0f, 
(byte) 5));
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               ExpressionUtil.from(input, "a, b");
+
+               ExpressionOperation<JavaBatchTranslator> result =
+                               expressionOperation.select("a & b, a | b, a ^ 
b, ~a");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "";
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/FilterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/FilterITCase.java
 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/FilterITCase.java
new file mode 100644
index 0000000..7da5fa3
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/FilterITCase.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.expressions.test;
+
+import org.apache.flink.api.expressions.ExpressionException;
+import org.apache.flink.api.expressions.ExpressionOperation;
+import org.apache.flink.api.expressions.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.expressions.ExpressionUtil;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class FilterITCase extends MultipleProgramsTestBase {
+
+
+       public FilterITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       private String resultPath;
+       private String expected = "";
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testAllRejectingFilter() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               ExpressionUtil.from(input, "a, b, c");
+
+               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation
+                               .filter("false");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "\n";
+       }
+
+       @Test
+       public void testAllPassingFilter() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               ExpressionUtil.from(input, "a, b, c");
+
+               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation
+                               .filter("true");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + 
"4,3,Hello world, " +
+                               "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
+                               "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+                               "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+                               "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+                               "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
+       }
+
+       @Test
+       public void testFilterOnIntegerTupleField() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               ExpressionUtil.from(input, "a, b, c");
+
+               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation
+                               .filter(" a % 2 === 0 ");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + 
"6,3,Luke Skywalker\n" + "8,4," +
+                               "Comment#2\n" + "10,4,Comment#4\n" + 
"12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+                               "Comment#10\n" + "18,6,Comment#12\n" + 
"20,6,Comment#14\n";
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/GroupedAggregationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/GroupedAggregationsITCase.java
 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/GroupedAggregationsITCase.java
new file mode 100644
index 0000000..8141dea
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/GroupedAggregationsITCase.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.expressions.test;
+
+import org.apache.flink.api.expressions.ExpressionException;
+import org.apache.flink.api.expressions.ExpressionOperation;
+import org.apache.flink.api.expressions.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.expressions.ExpressionUtil;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
+
+
+       public GroupedAggregationsITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       private String resultPath;
+       private String expected = "";
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testGroupingOnNonExistentField() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               ExpressionUtil.from(input, "a, b, c");
+
+               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation
+                               .groupBy("foo").select("a.avg");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "";
+       }
+
+       @Test
+       public void testGroupedAggregate() throws Exception {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               ExpressionUtil.from(input, "a, b, c");
+
+               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation
+                               .groupBy("b").select("b, a.sum");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + 
"6,111\n";
+       }
+
+       @Test
+       public void testGroupingKeyForwardIfNotUsed() throws Exception {
+
+               // the grouping key needs to be forwarded to the intermediate 
DataSet, even
+               // if we don't want the key in the output
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
+
+               ExpressionOperation<JavaBatchTranslator> expressionOperation =
+                               ExpressionUtil.from(input, "a, b, c");
+
+               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation
+                               .groupBy("b").select("a.sum");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n";
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/JoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/JoinITCase.java
 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/JoinITCase.java
new file mode 100644
index 0000000..3ece3dc
--- /dev/null
+++ 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/JoinITCase.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.expressions.test;
+
+import org.apache.flink.api.expressions.ExpressionException;
+import org.apache.flink.api.expressions.ExpressionOperation;
+import org.apache.flink.api.expressions.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.expressions.ExpressionUtil;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class JoinITCase extends MultipleProgramsTestBase {
+
+
+       public JoinITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       private String resultPath;
+       private String expected = "";
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception {
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+
+       @After
+       public void after() throws Exception {
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testJoin() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+
+               ExpressionOperation<JavaBatchTranslator> in1 = 
ExpressionUtil.from(ds1, "a, b, c");
+               ExpressionOperation<JavaBatchTranslator> in2 = 
ExpressionUtil.from(ds2, "d, e, f, g, h");
+
+               ExpressionOperation<JavaBatchTranslator> result = 
in1.join(in2).where("b === e").select("c, g");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello 
world,Hallo Welt\n";
+       }
+
+       @Test
+       public void testJoinWithFilter() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+
+               ExpressionOperation<JavaBatchTranslator> in1 = 
ExpressionUtil.from(ds1, "a, b, c");
+               ExpressionOperation<JavaBatchTranslator> in2 = 
ExpressionUtil.from(ds2, "d, e, f, g, h");
+
+               ExpressionOperation<JavaBatchTranslator> result = 
in1.join(in2).where("b === e && b < 2").select("c, g");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "Hi,Hallo\n";
+       }
+
+       @Test
+       public void testJoinWithMultipleKeys() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.get3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+
+               ExpressionOperation<JavaBatchTranslator> in1 = 
ExpressionUtil.from(ds1, "a, b, c");
+               ExpressionOperation<JavaBatchTranslator> in2 = 
ExpressionUtil.from(ds2, "d, e, f, g, h");
+
+               ExpressionOperation<JavaBatchTranslator> result = 
in1.join(in2).where("a === d && b === h").select("c, g");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello 
world,Hallo Welt wie gehts?\n" +
+                               "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I 
am fine.,IJK\n";
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testJoinNonExistingKey() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+
+               ExpressionOperation<JavaBatchTranslator> in1 = 
ExpressionUtil.from(ds1, "a, b, c");
+               ExpressionOperation<JavaBatchTranslator> in2 = 
ExpressionUtil.from(ds2, "d, e, f, g, h");
+
+               ExpressionOperation<JavaBatchTranslator> result = 
in1.join(in2).where("foo === e").select("c, g");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "";
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testJoinWithNonMatchingKeyTypes() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+
+               ExpressionOperation<JavaBatchTranslator> in1 = 
ExpressionUtil.from(ds1, "a, b, c");
+               ExpressionOperation<JavaBatchTranslator> in2 = 
ExpressionUtil.from(ds2, "d, e, f, g, h");
+
+               ExpressionOperation<JavaBatchTranslator> result = in1
+                               .join(in2).where("a === g").select("c, g");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "";
+       }
+
+       @Test(expected = ExpressionException.class)
+       public void testJoinWithAmbiguousFields() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+
+               ExpressionOperation<JavaBatchTranslator> in1 = 
ExpressionUtil.from(ds1, "a, b, c");
+               ExpressionOperation<JavaBatchTranslator> in2 = 
ExpressionUtil.from(ds2, "d, e, f, g, c");
+
+               ExpressionOperation<JavaBatchTranslator> result = in1
+                               .join(in2).where("a === d").select("c, g");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "";
+       }
+
+       @Test
+       public void testJoinWithAggregation() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
+               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
+
+               ExpressionOperation<JavaBatchTranslator> in1 = 
ExpressionUtil.from(ds1, "a, b, c");
+               ExpressionOperation<JavaBatchTranslator> in2 = 
ExpressionUtil.from(ds2, "d, e, f, g, h");
+
+               ExpressionOperation<JavaBatchTranslator> result = in1
+                               .join(in2).where("a === d").select("g.count");
+
+               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
+               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+               env.execute();
+
+               expected = "6";
+       }
+
+}

Reply via email to