http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/ExpressionCodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/ExpressionCodeGenerator.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/ExpressionCodeGenerator.scala
deleted file mode 100644
index cafba57..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/ExpressionCodeGenerator.scala
+++ /dev/null
@@ -1,635 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.codegen
-
-import java.util.concurrent.atomic.AtomicInteger
-import org.apache.flink.api.expressions.tree._
-import org.apache.flink.api.expressions.typeinfo.{RenamingProxyTypeInfo, 
RowTypeInfo}
-import org.apache.flink.api.expressions.{ExpressionException, tree}
-import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, 
BasicTypeInfo, TypeInformation}
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{TupleTypeInfo, PojoTypeInfo}
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-/** Base class for all code generation classes. This provides the 
functionality for generating
-  * code from an [[Expression]] tree. Derived classes must embed this in a 
lambda function
-  * to form an executable code block.
-  *
-  * @param inputs List of input variable names with corresponding 
[[TypeInformation]].
-  * @param nullCheck Whether the generated code should include checks for NULL 
values.
-  * @param cl The ClassLoader that is used to create the Scala reflection 
ToolBox
-  * @tparam R The type of the generated code block. In most cases a lambda 
function such
-  *           as "(IN1, IN2) => OUT".
-  */
-abstract class ExpressionCodeGenerator[R](
-                                           inputs: Seq[(String, 
CompositeType[_])],
-                                           val nullCheck: Boolean = false,
-                                           cl: ClassLoader) {
-  protected val log = 
LoggerFactory.getLogger(classOf[ExpressionCodeGenerator[_]])
-
-  import scala.reflect.runtime.{universe => ru}
-  import scala.reflect.runtime.universe._
-
-  if (cl == null) {
-    throw new IllegalArgumentException("ClassLoader must not be null.")
-  }
-
-  import scala.tools.reflect.ToolBox
-
-  protected val (mirror, toolBox) = ReflectionLock.synchronized {
-    val mirror = runtimeMirror(cl)
-    (mirror, mirror.mkToolBox())
-  }
-
-  // This is to be implemented by subclasses, we have it like this
-  // so that we only call it from here with the Scala Reflection Lock.
-  protected def generateInternal(): R
-
-  final def generate(): R = {
-    ReflectionLock.synchronized {
-      generateInternal()
-    }
-  }
-
-  val cache = mutable.HashMap[Expression, GeneratedExpression]()
-
-  protected def generateExpression(expr: Expression): GeneratedExpression = {
-    // doesn't work yet, because we insert the same code twice and reuse 
variable names
-    //    cache.getOrElseUpdate(expr, generateExpressionInternal(expr))
-    generateExpressionInternal(expr)
-  }
-
-  protected def generateExpressionInternal(expr: Expression): 
GeneratedExpression = {
-    //  protected def generateExpression(expr: Expression): 
GeneratedExpression = {
-    val nullTerm = freshTermName("isNull")
-    val resultTerm = freshTermName("result")
-
-    // For binary predicates that must only be evaluated when both operands 
are non-null.
-    // This will write to nullTerm and resultTerm, so don't use those term 
names
-    // after using this function
-    def generateIfNonNull(left: Expression, right: Expression, resultType: 
TypeInformation[_])
-                         (expr: (TermName, TermName) => Tree): Seq[Tree] = {
-      val leftCode = generateExpression(left)
-      val rightCode = generateExpression(right)
-
-
-      if (nullCheck) {
-        leftCode.code ++ rightCode.code ++ q"""
-        val $nullTerm = ${leftCode.nullTerm}|| ${rightCode.nullTerm}
-        val $resultTerm = if ($nullTerm) {
-          ${defaultPrimitive(resultType)}
-        } else {
-          ${expr(leftCode.resultTerm, rightCode.resultTerm)}
-        }
-        """.children
-      } else {
-        leftCode.code ++ rightCode.code :+ q"""
-        val $resultTerm = ${expr(leftCode.resultTerm, rightCode.resultTerm)}
-        """
-      }
-    }
-
-    val cleanedExpr = expr match {
-      case tree.Naming(namedExpr, _) => namedExpr
-      case _ => expr
-    }
-
-    val code: Seq[Tree] = cleanedExpr match {
-
-      case tree.Literal(null, typeInfo) =>
-        if (nullCheck) {
-          q"""
-            val $nullTerm = true
-            val resultTerm = null
-          """.children
-        } else {
-          Seq( q"""
-            val resultTerm = null
-          """)
-        }
-
-      case tree.Literal(intValue: Int, INT_TYPE_INFO) =>
-        if (nullCheck) {
-          q"""
-            val $nullTerm = false
-            val $resultTerm = $intValue
-          """.children
-        } else {
-          Seq( q"""
-            val $resultTerm = $intValue
-          """)
-        }
-
-      case tree.Literal(longValue: Long, LONG_TYPE_INFO) =>
-        if (nullCheck) {
-          q"""
-            val $nullTerm = false
-            val $resultTerm = $longValue
-          """.children
-        } else {
-          Seq( q"""
-            val $resultTerm = $longValue
-          """)
-        }
-
-
-      case tree.Literal(doubleValue: Double, DOUBLE_TYPE_INFO) =>
-        if (nullCheck) {
-          q"""
-            val $nullTerm = false
-            val $resultTerm = $doubleValue
-          """.children
-        } else {
-          Seq( q"""
-              val $resultTerm = $doubleValue
-          """)
-        }
-
-      case tree.Literal(floatValue: Float, FLOAT_TYPE_INFO) =>
-        if (nullCheck) {
-          q"""
-            val $nullTerm = false
-            val $resultTerm = $floatValue
-          """.children
-        } else {
-          Seq( q"""
-              val $resultTerm = $floatValue
-          """)
-        }
-
-      case tree.Literal(strValue: String, STRING_TYPE_INFO) =>
-        if (nullCheck) {
-          q"""
-            val $nullTerm = false
-            val $resultTerm = $strValue
-          """.children
-        } else {
-          Seq( q"""
-              val $resultTerm = $strValue
-          """)
-        }
-
-      case tree.Literal(boolValue: Boolean, BOOLEAN_TYPE_INFO) =>
-        if (nullCheck) {
-          q"""
-            val $nullTerm = false
-            val $resultTerm = $boolValue
-          """.children
-        } else {
-          Seq( q"""
-              val $resultTerm = $boolValue
-          """)
-        }
-
-      case Substring(str, beginIndex, endIndex) =>
-        val strCode = generateExpression(str)
-        val beginIndexCode = generateExpression(beginIndex)
-        val endIndexCode = generateExpression(endIndex)
-        if (nullCheck) {
-          strCode.code ++ beginIndexCode.code ++ endIndexCode.code ++ q"""
-            val $nullTerm =
-              ${strCode.nullTerm}|| ${beginIndexCode.nullTerm}|| 
${endIndexCode.nullTerm}
-            if ($nullTerm) {
-              ${defaultPrimitive(str.typeInfo)}
-            } else {
-              val $resultTerm = if (${endIndexCode.resultTerm} == 
Int.MaxValue) {
-                 
(${strCode.resultTerm}).substring(${beginIndexCode.resultTerm})
-              } else {
-                (${strCode.resultTerm}).substring(
-                  ${beginIndexCode.resultTerm},
-                  ${endIndexCode.resultTerm})
-              }
-            }
-          """.children
-        } else {
-          strCode.code ++ beginIndexCode.code ++ endIndexCode.code :+ q"""
-            val $resultTerm = if (${endIndexCode.resultTerm} == Int.MaxValue) {
-              (${strCode.resultTerm}).substring(${beginIndexCode.resultTerm})
-            } else {
-              (${strCode.resultTerm}).substring(
-                ${beginIndexCode.resultTerm},
-                ${endIndexCode.resultTerm})
-            }
-          """
-        }
-
-      case tree.Cast(child: Expression, STRING_TYPE_INFO) =>
-        val childGen = generateExpression(child)
-        val castCode = if (nullCheck) {
-          q"""
-            val $nullTerm = ${childGen.nullTerm}
-            val $resultTerm = if ($nullTerm == null) {
-              null
-            } else {
-              ${childGen.resultTerm}.toString
-            }
-          """.children
-        } else {
-          Seq( q"""
-            val $resultTerm = ${childGen.resultTerm}.toString
-          """)
-        }
-        childGen.code ++ castCode
-
-      case tree.Cast(child: Expression, INT_TYPE_INFO) =>
-        val childGen = generateExpression(child)
-        val castCode = if (nullCheck) {
-          q"""
-            val $nullTerm = ${childGen.nullTerm}
-            val $resultTerm = ${childGen.resultTerm}.toInt
-          """.children
-        } else {
-          Seq( q"""
-            val $resultTerm = ${childGen.resultTerm}.toInt
-          """)
-        }
-        childGen.code ++ castCode
-
-      case tree.Cast(child: Expression, LONG_TYPE_INFO) =>
-        val childGen = generateExpression(child)
-        val castCode = if (nullCheck) {
-          q"""
-            val $nullTerm = ${childGen.nullTerm}
-            val $resultTerm = ${childGen.resultTerm}.toLong
-          """.children
-        } else {
-          Seq( q"""
-            val $resultTerm = ${childGen.resultTerm}.toLong
-          """)
-        }
-        childGen.code ++ castCode
-
-      case tree.Cast(child: Expression, FLOAT_TYPE_INFO) =>
-        val childGen = generateExpression(child)
-        val castCode = if (nullCheck) {
-          q"""
-            val $nullTerm = ${childGen.nullTerm}
-            val $resultTerm = ${childGen.resultTerm}.toFloat
-          """.children
-        } else {
-          Seq( q"""
-            val $resultTerm = ${childGen.resultTerm}.toFloat
-          """)
-        }
-        childGen.code ++ castCode
-
-      case tree.Cast(child: Expression, DOUBLE_TYPE_INFO) =>
-        val childGen = generateExpression(child)
-        val castCode = if (nullCheck) {
-          q"""
-            val $nullTerm = ${childGen.nullTerm}
-            val $resultTerm = ${childGen.resultTerm}.toDouble
-          """.children
-        } else {
-          Seq( q"""
-            val $resultTerm = ${childGen.resultTerm}.toDouble
-          """)
-        }
-        childGen.code ++ castCode
-
-      case ResolvedFieldReference(fieldName, fieldTpe: TypeInformation[_]) =>
-        inputs find { i => i._2.hasField(fieldName)} match {
-          case Some((inputName, inputTpe)) =>
-            val fieldCode = getField(newTermName(inputName), inputTpe, 
fieldName, fieldTpe)
-            if (nullCheck) {
-              q"""
-                val $resultTerm = $fieldCode
-                val $nullTerm = $resultTerm == null
-              """.children
-            } else {
-              Seq( q"""
-                val $resultTerm = $fieldCode
-              """)
-            }
-
-          case None => throw new ExpressionException("Could not get accessor 
for " + fieldName
-            + " in inputs " + inputs.mkString(", ") + ".")
-        }
-
-      case GreaterThan(left, right) =>
-        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => q"$leftTerm > $rightTerm"
-        }
-
-      case GreaterThanOrEqual(left, right) =>
-        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => q"$leftTerm >= $rightTerm"
-        }
-
-      case LessThan(left, right) =>
-        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => q"$leftTerm < $rightTerm"
-        }
-
-      case LessThanOrEqual(left, right) =>
-        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => q"$leftTerm <= $rightTerm"
-        }
-
-      case EqualTo(left, right) =>
-        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => q"$leftTerm == $rightTerm"
-        }
-
-      case NotEqualTo(left, right) =>
-        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => q"$leftTerm != $rightTerm"
-        }
-
-      case And(left, right) =>
-        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => q"$leftTerm && $rightTerm"
-        }
-
-      case Or(left, right) =>
-        generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => q"$leftTerm || $rightTerm"
-        }
-
-      case Plus(left, right) =>
-        generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => q"$leftTerm + $rightTerm"
-        }
-
-      case Minus(left, right) =>
-        generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => q"$leftTerm - $rightTerm"
-        }
-
-      case Div(left, right) =>
-        generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => q"$leftTerm / $rightTerm"
-        }
-
-      case Mul(left, right) =>
-        generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => q"$leftTerm * $rightTerm"
-        }
-
-      case Mod(left, right) =>
-        generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => q"$leftTerm % $rightTerm"
-        }
-
-      case UnaryMinus(child) =>
-        val childCode = generateExpression(child)
-        if (nullCheck) {
-          childCode.code ++ q"""
-            val $nullTerm = ${childCode.nullTerm}
-            if ($nullTerm) {
-              ${defaultPrimitive(child.typeInfo)}
-            } else {
-              val $resultTerm = -(${childCode.resultTerm})
-            }
-          """.children
-        } else {
-          childCode.code :+ q"""
-              val $resultTerm = -(${childCode.resultTerm})
-          """
-        }
-
-      case BitwiseAnd(left, right) =>
-        generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => q"$leftTerm & $rightTerm"
-        }
-
-      case BitwiseOr(left, right) =>
-        generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => q"$leftTerm | $rightTerm"
-        }
-
-      case BitwiseXor(left, right) =>
-        generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => q"$leftTerm ^ $rightTerm"
-        }
-
-      case BitwiseNot(child) =>
-        val childCode = generateExpression(child)
-        if (nullCheck) {
-          childCode.code ++ q"""
-            val $nullTerm = ${childCode.nullTerm}
-            if ($nullTerm) {
-              ${defaultPrimitive(child.typeInfo)}
-            } else {
-              val $resultTerm = ~(${childCode.resultTerm})
-            }
-          """.children
-        } else {
-          childCode.code :+ q"""
-              val $resultTerm = ~(${childCode.resultTerm})
-          """
-        }
-
-      case Not(child) =>
-        val childCode = generateExpression(child)
-        if (nullCheck) {
-          childCode.code ++ q"""
-            val $nullTerm = ${childCode.nullTerm}
-            if ($nullTerm) {
-              ${defaultPrimitive(child.typeInfo)}
-            } else {
-              val $resultTerm = !(${childCode.resultTerm})
-            }
-          """.children
-        } else {
-          childCode.code :+ q"""
-              val $resultTerm = !(${childCode.resultTerm})
-          """
-        }
-
-      case IsNull(child) =>
-        val childCode = generateExpression(child)
-        if (nullCheck) {
-          childCode.code ++ q"""
-            val $nullTerm = ${childCode.nullTerm}
-            if ($nullTerm) {
-              ${defaultPrimitive(child.typeInfo)}
-            } else {
-              val $resultTerm = (${childCode.resultTerm}) == null
-            }
-          """.children
-        } else {
-          childCode.code :+ q"""
-              val $resultTerm = (${childCode.resultTerm}) == null
-          """
-        }
-
-      case IsNotNull(child) =>
-        val childCode = generateExpression(child)
-        if (nullCheck) {
-          childCode.code ++ q"""
-            val $nullTerm = ${childCode.nullTerm}
-            if ($nullTerm) {
-              ${defaultPrimitive(child.typeInfo)}
-            } else {
-              val $resultTerm = (${childCode.resultTerm}) != null
-            }
-          """.children
-        } else {
-          childCode.code :+ q"""
-              val $resultTerm = (${childCode.resultTerm}) != null
-          """
-        }
-
-      case Abs(child) =>
-        val childCode = generateExpression(child)
-        if (nullCheck) {
-          childCode.code ++ q"""
-            val $nullTerm = ${childCode.nullTerm}
-            if ($nullTerm) {
-              ${defaultPrimitive(child.typeInfo)}
-            } else {
-              val $resultTerm = Math.abs(${childCode.resultTerm})
-            }
-          """.children
-        } else {
-          childCode.code :+ q"""
-              val $resultTerm = Math.abs(${childCode.resultTerm})
-          """
-        }
-
-      case _ => throw new ExpressionException("Could not generate code for 
expression " + expr)
-    }
-
-    GeneratedExpression(code, resultTerm, nullTerm)
-  }
-
-  case class GeneratedExpression(code: Seq[Tree], resultTerm: TermName, 
nullTerm: TermName)
-
-  // We don't have c.freshName
-  // According to http://docs.scala-lang.org/overviews/quasiquotes/hygiene.html
-  // it's coming for 2.11. We can't wait that long...
-  def freshTermName(name: String): TermName = {
-    newTermName(s"$name$$${freshNameCounter.getAndIncrement}")
-  }
-
-  val freshNameCounter = new AtomicInteger
-
-  protected def getField(
-                          inputTerm: TermName,
-                          inputType: CompositeType[_],
-                          fieldName: String,
-                          fieldType: TypeInformation[_]): Tree = {
-    val accessor = fieldAccessorFor(inputType, fieldName)
-    accessor match {
-      case ObjectFieldAccessor(fieldName) =>
-        val fieldTerm = newTermName(fieldName)
-        
q"$inputTerm.$fieldTerm.asInstanceOf[${typeTermForTypeInfo(fieldType)}]"
-
-      case ObjectMethodAccessor(methodName) =>
-        val methodTerm = newTermName(methodName)
-        
q"$inputTerm.$methodTerm().asInstanceOf[${typeTermForTypeInfo(fieldType)}]"
-
-      case ProductAccessor(i) =>
-        
q"$inputTerm.productElement($i).asInstanceOf[${typeTermForTypeInfo(fieldType)}]"
-
-    }
-  }
-
-  sealed abstract class FieldAccessor
-
-  case class ObjectFieldAccessor(fieldName: String) extends FieldAccessor
-
-  case class ObjectMethodAccessor(methodName: String) extends FieldAccessor
-
-  case class ProductAccessor(i: Int) extends FieldAccessor
-
-  def fieldAccessorFor(elementType: CompositeType[_], fieldName: String): 
FieldAccessor = {
-    elementType match {
-      case ri: RowTypeInfo =>
-        ProductAccessor(elementType.getFieldIndex(fieldName))
-
-      case cc: CaseClassTypeInfo[_] =>
-        ObjectFieldAccessor(fieldName)
-
-      case javaTup: TupleTypeInfo[_] =>
-        ObjectFieldAccessor(fieldName)
-
-      case pj: PojoTypeInfo[_] =>
-        ObjectFieldAccessor(fieldName)
-
-      case proxy: RenamingProxyTypeInfo[_] =>
-        val underlying = proxy.getUnderlyingType
-        val fieldIndex = proxy.getFieldIndex(fieldName)
-        fieldAccessorFor(underlying, underlying.getFieldNames()(fieldIndex))
-    }
-  }
-
-  protected def defaultPrimitive(tpe: TypeInformation[_]) = tpe match {
-    case BasicTypeInfo.INT_TYPE_INFO => ru.Literal(Constant(-1))
-    case BasicTypeInfo.LONG_TYPE_INFO => ru.Literal(Constant(1L))
-    case BasicTypeInfo.SHORT_TYPE_INFO => ru.Literal(Constant(-1.toShort))
-    case BasicTypeInfo.BYTE_TYPE_INFO => ru.Literal(Constant(-1.toByte))
-    case BasicTypeInfo.FLOAT_TYPE_INFO => ru.Literal(Constant(-1.0.toFloat))
-    case BasicTypeInfo.DOUBLE_TYPE_INFO => ru.Literal(Constant(-1.toDouble))
-    case BasicTypeInfo.BOOLEAN_TYPE_INFO => ru.Literal(Constant(false))
-    case BasicTypeInfo.STRING_TYPE_INFO => ru.Literal(Constant("<empty>"))
-    case BasicTypeInfo.CHAR_TYPE_INFO => ru.Literal(Constant('\0'))
-    case _ => ru.Literal(Constant(null))
-  }
-
-  protected def typeTermForTypeInfo(typeInfo: TypeInformation[_]): Tree = {
-    val tpe = typeForTypeInfo(typeInfo)
-    tq"$tpe"
-  }
-
-  // We need two separate methods here because typeForTypeInfo is recursive 
when generating
-  // the type for a type with generic parameters.
-  protected def typeForTypeInfo(tpe: TypeInformation[_]): Type = tpe match {
-
-    // From PrimitiveArrayTypeInfo we would get class "int[]", scala 
reflections
-    // does not seem to like this, so we manually give the correct type here.
-    case PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO => 
typeOf[Array[Int]]
-    case PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO => 
typeOf[Array[Long]]
-    case PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO => 
typeOf[Array[Short]]
-    case PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO => 
typeOf[Array[Byte]]
-    case PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => 
typeOf[Array[Float]]
-    case PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => 
typeOf[Array[Double]]
-    case PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => 
typeOf[Array[Boolean]]
-    case PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO => 
typeOf[Array[Char]]
-
-    case _ =>
-      val clazz = mirror.staticClass(tpe.getTypeClass.getCanonicalName)
-
-      clazz.selfType.erasure match {
-        case ExistentialType(_, underlying) => underlying
-
-        case tpe@TypeRef(prefix, sym, Nil) =>
-          // Non-generic type, just return the type
-          tpe
-
-        case TypeRef(prefix, sym, emptyParams) =>
-          val genericTypeInfos = tpe.getGenericParameters.asScala
-          if (emptyParams.length != genericTypeInfos.length) {
-            throw new RuntimeException("Number of type parameters does not 
match.")
-          }
-          val typeParams = genericTypeInfos.map(typeForTypeInfo)
-          // TODO: remove, added only for migration of the line below, as 
suggested by the compiler
-          import compat._
-          TypeRef(prefix, sym, typeParams.toList)
-      }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryPredicate.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryPredicate.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryPredicate.scala
deleted file mode 100644
index ce80469..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryPredicate.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.codegen
-
-import org.apache.flink.api.expressions.tree.Expression
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.slf4j.LoggerFactory
-
-/**
- * Code generator for binary predicates, i.e. a Join or CoGroup Predicate.
- */
-class GenerateBinaryPredicate[L, R](
-    leftType: CompositeType[L],
-    rightType: CompositeType[R],
-    predicate: Expression,
-    cl: ClassLoader)
-  extends ExpressionCodeGenerator[(L, R) => Boolean](
-    Seq(("input0", leftType), ("input1", rightType)),
-    cl = cl) {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  import scala.reflect.runtime.{universe => ru}
-  import scala.reflect.runtime.universe._
-
-  override protected def generateInternal(): ((L, R) => Boolean) = {
-    val pred = generateExpression(predicate)
-
-    val in0 = newTermName("input0")
-    val in1 = newTermName("input1")
-
-    val leftTpe = typeTermForTypeInfo(leftType)
-    val rightTpe = typeTermForTypeInfo(rightType)
-
-    val code = if (nullCheck) {
-      q"""
-        ($in0: $leftTpe, $in1: $rightTpe) => {
-          ..${pred.code}
-          if (${pred.nullTerm}) {
-            false
-          } else {
-            ${pred.resultTerm}
-          }
-        }
-      """
-    } else {
-      q"""
-        ($in0: $leftTpe, $in1: $rightTpe) => {
-          ..${pred.code}
-          ${pred.resultTerm}
-        }
-      """
-    }
-
-    LOG.debug(s"""Generated binary predicate "$predicate":\n$code""")
-    toolBox.eval(code).asInstanceOf[(L, R) => Boolean]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryResultAssembler.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryResultAssembler.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryResultAssembler.scala
deleted file mode 100644
index 4066831..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryResultAssembler.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.codegen
-
-import org.apache.flink.api.expressions.tree.Expression
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.slf4j.LoggerFactory
-
-/**
- * Code generator for assembling the result of a binary operation.
- */
-class GenerateBinaryResultAssembler[L, R, O](
-    leftTypeInfo: CompositeType[L],
-    rightTypeInfo: CompositeType[R],
-    resultTypeInfo: CompositeType[O],
-    outputFields: Seq[Expression],
-    cl: ClassLoader)
-  extends GenerateResultAssembler[(L, R, O) => O](
-    Seq(("input0", leftTypeInfo), ("input1", rightTypeInfo)),
-    cl = cl) {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  import scala.reflect.runtime.universe._
-
-
-  override protected def generateInternal(): ((L, R, O) => O) = {
-
-    val leftType = typeTermForTypeInfo(leftTypeInfo)
-    val rightType = typeTermForTypeInfo(rightTypeInfo)
-    val resultType = typeTermForTypeInfo(resultTypeInfo)
-
-    val resultCode = createResult(resultTypeInfo, outputFields)
-
-    val code: Tree =
-      q"""
-        (input0: $leftType, input1: $rightType, out: $resultType) => {
-          ..$resultCode
-        }
-      """
-
-    LOG.debug(s"Generated binary result-assembler:\n$code")
-    toolBox.eval(code).asInstanceOf[(L, R, O) => O]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateResultAssembler.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateResultAssembler.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateResultAssembler.scala
deleted file mode 100644
index 977126d..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateResultAssembler.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.codegen
-
-import org.apache.flink.api.expressions.tree.Expression
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.expressions.typeinfo.RowTypeInfo
-import org.apache.flink.api.java.typeutils.{TupleTypeInfo, PojoTypeInfo}
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-
-/**
- * Base class for unary and binary result assembler code generators.
- */
-abstract class GenerateResultAssembler[R](
-    inputs: Seq[(String, CompositeType[_])],
-    cl: ClassLoader)
-  extends ExpressionCodeGenerator[R](inputs, cl = cl) {
-  import scala.reflect.runtime.{universe => ru}
-  import scala.reflect.runtime.universe._
-
-  def createResult[T](
-      resultTypeInfo: CompositeType[T],
-      outputFields: Seq[Expression]): Tree = {
-
-    val resultType = typeTermForTypeInfo(resultTypeInfo)
-
-    val fieldsCode = outputFields.map(generateExpression)
-
-    val block = resultTypeInfo match {
-      case ri: RowTypeInfo =>
-        val resultSetters: Seq[Tree] = fieldsCode.zipWithIndex map {
-          case (fieldCode, i) =>
-            q"""
-              out.setField($i, { ..${fieldCode.code}; ${fieldCode.resultTerm} 
})
-            """
-        }
-
-        q"""
-          ..$resultSetters
-          out
-        """
-
-      case pj: PojoTypeInfo[_] =>
-        val resultSetters: Seq[Tree] = fieldsCode.zip(outputFields) map {
-        case (fieldCode, expr) =>
-          val fieldName = newTermName(expr.name)
-          q"""
-              out.$fieldName = { ..${fieldCode.code}; ${fieldCode.resultTerm} }
-            """
-        }
-
-        q"""
-          ..$resultSetters
-          out
-        """
-
-      case tup: TupleTypeInfo[_] =>
-        val resultSetters: Seq[Tree] = fieldsCode.zip(outputFields) map {
-          case (fieldCode, expr) =>
-            val fieldName = newTermName(expr.name)
-            q"""
-              out.$fieldName = { ..${fieldCode.code}; ${fieldCode.resultTerm} }
-            """
-        }
-
-        q"""
-          ..$resultSetters
-          out
-        """
-
-      case cc: CaseClassTypeInfo[_] =>
-        val resultFields: Seq[Tree] = fieldsCode map {
-          fieldCode =>
-            q"{ ..${fieldCode.code}; ${fieldCode.resultTerm}}"
-        }
-        q"""
-          new $resultType(..$resultFields)
-        """
-    }
-
-    block
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryPredicate.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryPredicate.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryPredicate.scala
deleted file mode 100644
index cd8edc4..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryPredicate.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.codegen
-
-import org.apache.flink.api.expressions.tree.Expression
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.slf4j.LoggerFactory
-
-/**
- * Code generator for a unary predicate, i.e. a Filter.
- */
-class GenerateUnaryPredicate[T](
-    inputType: CompositeType[T],
-    predicate: Expression,
-    cl: ClassLoader) extends ExpressionCodeGenerator[T => Boolean](
-      Seq(("input0", inputType)),
-      cl = cl) {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  import scala.reflect.runtime.{universe => ru}
-  import scala.reflect.runtime.universe._
-
-  override protected def generateInternal(): (T => Boolean) = {
-    val pred = generateExpression(predicate)
-
-    val tpe = typeTermForTypeInfo(inputType)
-
-    val code = if (nullCheck) {
-      q"""
-        (input0: $tpe) => {
-          ..${pred.code}
-          if (${pred.nullTerm}) {
-            false
-          } else {
-            ${pred.resultTerm}
-          }
-        }
-      """
-    } else {
-      q"""
-        (input0: $tpe) => {
-          ..${pred.code}
-          ${pred.resultTerm}
-        }
-      """
-    }
-
-    LOG.debug(s"""Generated unary predicate "$predicate":\n$code""")
-    toolBox.eval(code).asInstanceOf[(T) => Boolean]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryResultAssembler.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryResultAssembler.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryResultAssembler.scala
deleted file mode 100644
index 49970c1..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryResultAssembler.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.codegen
-
-import org.apache.flink.api.expressions.tree.Expression
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.slf4j.LoggerFactory
-
-/**
- * Code generator for assembling the result of a unary operation.
- */
-class GenerateUnaryResultAssembler[I, O](
-    inputTypeInfo: CompositeType[I],
-    resultTypeInfo: CompositeType[O],
-    outputFields: Seq[Expression],
-    cl: ClassLoader)
-  extends GenerateResultAssembler[(I, O) => O](
-    Seq(("input0", inputTypeInfo)),
-    cl = cl) {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  import scala.reflect.runtime.universe._
-
-  override protected def generateInternal(): ((I, O) => O) = {
-
-    val inputType = typeTermForTypeInfo(inputTypeInfo)
-    val resultType = typeTermForTypeInfo(resultTypeInfo)
-
-    val resultCode = createResult(resultTypeInfo, outputFields)
-
-    val code: Tree =
-      q"""
-        (input0: $inputType, out: $resultType) => {
-          ..$resultCode
-        }
-      """
-
-    LOG.debug(s"Generated unary result-assembler:\n${show(code)}")
-    toolBox.eval(code).asInstanceOf[(I, O) => O]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/package.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/package.scala
deleted file mode 100644
index 0b7c95d..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/package.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions
-
-package object codegen {
-  // Used in ExpressionCodeGenerator because Scala 2.10 reflection is not 
thread safe. We might
-  // have several parallel expression operators in one TaskManager, therefore 
we need to guard
-  // these operations.
-  object ReflectionLock
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/ExpandAggregations.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/ExpandAggregations.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/ExpandAggregations.scala
deleted file mode 100644
index 723483c..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/ExpandAggregations.scala
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.operations
-
-import org.apache.flink.api.expressions.analysis.SelectionAnalyzer
-import org.apache.flink.api.expressions.tree._
-import org.apache.flink.api.java.aggregation.Aggregations
-
-import scala.collection.mutable
-
-/**
- * This is used to expand a [[Select]] that contains aggregations. If it is 
called on a [[Select]]
- * without aggregations it is simply returned.
- *
- * This select:
- * {{{
- *   in.select('key, 'value.avg)
- * }}}
- *
- * is transformed to this expansion:
- * {{{
- *   in
- *     .select('key, 'value, Literal(1) as 'intermediate.1)
- *     .aggregate('value.sum, 'intermediate.1.sum)
- *     .select('key, 'value / 'intermediate.1)
- * }}}
- *
- * If the input of the [[Select]] is a [[GroupBy]] this is preserved before 
the aggregation.
- */
-object ExpandAggregations {
-  def apply(select: Select): Operation = select match {
-    case Select(input, selection) =>
-
-      val aggregations = mutable.HashMap[(Expression, Aggregations), String]()
-      val intermediateFields = mutable.HashSet[Expression]()
-      val aggregationIntermediates = mutable.HashMap[Aggregation, 
Seq[Expression]]()
-
-      var intermediateCount = 0
-      selection foreach {  f =>
-        f.transformPre {
-          case agg: Aggregation =>
-            val intermediateReferences = 
agg.getIntermediateFields.zip(agg.getAggregations) map {
-              case (expr, basicAgg) =>
-                aggregations.get((expr, basicAgg)) match {
-                  case Some(intermediateName) =>
-                    ResolvedFieldReference(intermediateName, expr.typeInfo)
-                  case None =>
-                    intermediateCount = intermediateCount + 1
-                    val intermediateName = s"intermediate.$intermediateCount"
-                    intermediateFields += Naming(expr, intermediateName)
-                    aggregations((expr, basicAgg)) = intermediateName
-                    ResolvedFieldReference(intermediateName, expr.typeInfo)
-                }
-            }
-
-            aggregationIntermediates(agg) = intermediateReferences
-            // Return a NOP so that we don't add the children of the 
aggregation
-            // to intermediate fields. We already added the necessary fields 
to the list
-            // of intermediate fields.
-            NopExpression()
-
-          case fa: ResolvedFieldReference =>
-            if (!fa.name.startsWith("intermediate")) {
-              intermediateFields += Naming(fa, fa.name)
-            }
-            fa
-        }
-      }
-
-      if (aggregations.isEmpty) {
-        // no aggregations, just return
-        return select
-      }
-
-      // also add the grouping keys to the set of intermediate fields, because 
we use a Set,
-      // they are only added when not already present
-      input match {
-        case GroupBy(_, groupingFields) =>
-          groupingFields foreach {
-            case fa: ResolvedFieldReference =>
-              intermediateFields += Naming(fa, fa.name)
-          }
-        case _ => // Nothing to add
-      }
-
-      val basicAggregations = aggregations.map {
-        case ((expr, basicAgg), fieldName) =>
-          (fieldName, basicAgg)
-      }
-
-      val finalFields = selection.map {  f =>
-        f.transformPre {
-          case agg: Aggregation =>
-            val intermediates = aggregationIntermediates(agg)
-            agg.getFinalField(intermediates)
-        }
-      }
-
-      val intermediateAnalyzer = new SelectionAnalyzer(input.outputFields)
-      val analyzedIntermediates = 
intermediateFields.toSeq.map(intermediateAnalyzer.analyze)
-
-      val finalAnalyzer =
-        new SelectionAnalyzer(analyzedIntermediates.map(e => (e.name, 
e.typeInfo)))
-      val analyzedFinals = finalFields.map(finalAnalyzer.analyze)
-
-      val result = input match {
-        case GroupBy(groupByInput, groupingFields) =>
-          Select(
-            Aggregate(
-              GroupBy(
-                Select(groupByInput, analyzedIntermediates),
-                groupingFields),
-              basicAggregations.toSeq),
-            analyzedFinals)
-
-        case _ =>
-          Select(
-            Aggregate(
-              Select(input, analyzedIntermediates),
-              basicAggregations.toSeq),
-            analyzedFinals)
-
-      }
-
-      result
-
-    case _ => select
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/OperationTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/OperationTranslator.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/OperationTranslator.scala
deleted file mode 100644
index 12e4793..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/OperationTranslator.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.operations
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-/**
- * When an [[org.apache.flink.api.expressions.ExpressionOperation]] is created 
an
- * [[OperationTranslator]] corresponding to the underlying representation 
(either
- * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]] is
- * created. This way, the expression API can be completely agnostic while 
translation back to the
- * correct API is handled by the API specific translator.
- */
-abstract class OperationTranslator {
-
-  type Representation[A]
-
-  def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): 
Representation[A]
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/operations.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/operations.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/operations.scala
deleted file mode 100644
index d036631..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/operations.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.operations
-
-import org.apache.flink.api.expressions.tree.Expression
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.aggregation.Aggregations
-
-/**
- * Base class for all expression operations.
- */
-sealed abstract class Operation {
-  def outputFields: Seq[(String, TypeInformation[_])]
-}
-
-/**
- * Operation that transforms a [[org.apache.flink.api.scala.DataSet]] or
- * [[org.apache.flink.streaming.api.scala.DataStream]] into an expression 
operation.
- */
-case class Root[T](input: T, outputFields: Seq[(String, TypeInformation[_])]) 
extends Operation
-
-/**
- * Operation that joins two expression operations. A "filter" and a "select" 
should be applied
- * after a join operation.
- */
-case class Join(left: Operation, right: Operation) extends Operation {
-  def outputFields = left.outputFields ++ right.outputFields
-
-  override def toString = s"Join($left, $right)"
-}
-
-/**
- * Operation that filters out elements that do not match the predicate 
expression.
- */
-case class Filter(input: Operation, predicate: Expression) extends Operation {
-  def outputFields = input.outputFields
-
-  override def toString = s"Filter($input, $predicate)"
-}
-
-/**
- * Selection expression. Similar to an SQL SELECT statement. The expressions 
can select fields
- * and perform arithmetic or logic operations. The expressions can also 
perform aggregates
- * on fields.
- */
-case class Select(input: Operation, selection: Seq[Expression]) extends 
Operation {
-  def outputFields = selection.toSeq map { e => (e.name, e.typeInfo) }
-
-  override def toString = s"Select($input, ${selection.mkString(",")})"
-}
-
-/**
- * Operation that gives new names to fields. Use this to disambiguate fields 
before a join
- * operation.
- */
-case class As(input: Operation, names: Seq[String]) extends Operation {
-  val outputFields = input.outputFields.zip(names) map {
-    case ((_, tpe), newName) => (newName, tpe)
-  }
-
-  override def toString = s"As($input, ${names.mkString(",")})"
-}
-
-/**
- * Grouping operation. Keys are specified using field references. A group by 
operation os only
- * useful when performing a select with aggregates afterwards.
- * @param input
- * @param fields
- */
-case class GroupBy(input: Operation, fields: Seq[Expression]) extends 
Operation {
-  def outputFields = input.outputFields
-
-  override def toString = s"GroupBy($input, ${fields.mkString(",")})"
-}
-
-/**
- * Internal operation. Selection operations containing aggregates are expanded 
to an [[Aggregate]]
- * and a simple [[Select]].
- */
-case class Aggregate(
-    input: Operation,
-    aggregations: Seq[(String, Aggregations)]) extends Operation {
-  def outputFields = input.outputFields
-
-  override def toString = s"Aggregate($input, ${aggregations.mkString(",")})"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/package.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/package.scala
deleted file mode 100644
index 2aa80b3..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/package.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions
-
-/**
- * The operations in this package are created by calling methods on 
[[ExpressionOperation]] they
- * should not be manually created by users of the API.
- */
-package object operations

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/parser/ExpressionParser.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/parser/ExpressionParser.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/parser/ExpressionParser.scala
deleted file mode 100644
index da53ded..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/parser/ExpressionParser.scala
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.parser
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.expressions.operations.As
-import org.apache.flink.api.expressions.tree._
-
-import scala.util.parsing.combinator.{PackratParsers, JavaTokenParsers}
-
-/**
- * Parser for expressions inside a String. This parses exactly the same 
expressions that
- * would be accepted by the Scala Expression DSL.
- *
- * See 
[[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and
- * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]] for 
the constructs
- * available in the Scala Expression DSL. This parser must be kept in sync 
with the Scala DSL
- * lazy valined in the above files.
- */
-object ExpressionParser extends JavaTokenParsers with PackratParsers {
-
-  // Literals
-
-  lazy val numberLiteral: PackratParser[Expression] =
-    ((wholeNumber <~ ("L" | "l")) | floatingPointNumber | decimalNumber | 
wholeNumber) ^^ {
-      str =>
-        if (str.endsWith("L") || str.endsWith("l")) {
-          Literal(str.toLong)
-        } else if (str.matches("""-?\d+""")) {
-          Literal(str.toInt)
-        } else if (str.endsWith("f") | str.endsWith("F")) {
-          Literal(str.toFloat)
-        } else {
-          Literal(str.toDouble)
-        }
-    }
-
-  lazy val singleQuoteStringLiteral: Parser[Expression] =
-    ("'" + """([^'\p{Cntrl}\\]|\\[\\'"bfnrt]|\\u[a-fA-F0-9]{4})*""" + "'").r 
^^ {
-      str => Literal(str.substring(1, str.length - 1))
-    }
-
-  lazy val stringLiteralFlink: PackratParser[Expression] = super.stringLiteral 
^^ {
-    str => Literal(str.substring(1, str.length - 1))
-  }
-
-  lazy val boolLiteral: PackratParser[Expression] = ("true" | "false") ^^ {
-    str => Literal(str.toBoolean)
-  }
-
-  lazy val literalExpr: PackratParser[Expression] =
-    numberLiteral |
-      stringLiteralFlink | singleQuoteStringLiteral |
-      boolLiteral
-
-  lazy val fieldReference: PackratParser[Expression] = ident ^^ {
-    case sym => UnresolvedFieldReference(sym)
-  }
-
-  lazy val atom: PackratParser[Expression] =
-    ( "(" ~> expression <~ ")" ) | literalExpr | fieldReference
-
-  // suffix ops
-  lazy val isNull: PackratParser[Expression] = atom <~ ".isNull" ^^ { e => 
IsNull(e) }
-  lazy val isNotNull: PackratParser[Expression] = atom <~ ".isNotNull" ^^ { e 
=> IsNotNull(e) }
-
-  lazy val abs: PackratParser[Expression] = atom <~ ".abs" ^^ { e => Abs(e) }
-
-  lazy val sum: PackratParser[Expression] = atom <~ ".sum" ^^ { e => Sum(e) }
-  lazy val min: PackratParser[Expression] = atom <~ ".min" ^^ { e => Min(e) }
-  lazy val max: PackratParser[Expression] = atom <~ ".max" ^^ { e => Max(e) }
-  lazy val count: PackratParser[Expression] = atom <~ ".count" ^^ { e => 
Count(e) }
-  lazy val avg: PackratParser[Expression] = atom <~ ".avg" ^^ { e => Avg(e) }
-
-  lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ 
")" ^^ {
-    case e ~ _ ~ as ~ _ => Naming(e, as.name)
-  }
-
-  lazy val substring: PackratParser[Expression] =
-    atom ~ ".substring(" ~ expression ~ "," ~ expression ~ ")" ^^ {
-      case e ~ _ ~ from ~ _ ~ to ~ _ => Substring(e, from, to)
-
-    }
-
-  lazy val substringWithoutEnd: PackratParser[Expression] =
-    atom ~ ".substring(" ~ expression ~ ")" ^^ {
-      case e ~ _ ~ from ~ _ => Substring(e, from, Literal(Integer.MAX_VALUE))
-
-    }
-
-  lazy val suffix =
-    isNull | isNotNull |
-      abs | sum | min | max | count | avg |
-      substring | substringWithoutEnd | atom
-
-
-  // unary ops
-
-  lazy val unaryNot: PackratParser[Expression] = "!" ~> suffix ^^ { e => 
Not(e) }
-
-  lazy val unaryMinus: PackratParser[Expression] = "-" ~> suffix ^^ { e => 
UnaryMinus(e) }
-
-  lazy val unaryBitwiseNot: PackratParser[Expression] = "~" ~> suffix ^^ { e 
=> BitwiseNot(e) }
-
-  lazy val unary = unaryNot | unaryMinus | unaryBitwiseNot | suffix
-
-  // binary bitwise opts
-
-  lazy val binaryBitwise = unary * (
-    "&" ^^^ { (a:Expression, b:Expression) => BitwiseAnd(a,b) } |
-      "|" ^^^ { (a:Expression, b:Expression) => BitwiseOr(a,b) } |
-      "^" ^^^ { (a:Expression, b:Expression) => BitwiseXor(a,b) } )
-
-  // arithmetic
-
-  lazy val product = binaryBitwise * (
-    "*" ^^^ { (a:Expression, b:Expression) => Mul(a,b) } |
-      "/" ^^^ { (a:Expression, b:Expression) => Div(a,b) } |
-      "%" ^^^ { (a:Expression, b:Expression) => Mod(a,b) } )
-
-  lazy val term = product * (
-    "+" ^^^ { (a:Expression, b:Expression) => Plus(a,b) } |
-     "-" ^^^ { (a:Expression, b:Expression) => Minus(a,b) } )
-
-  // Comparison
-
-  lazy val equalTo: PackratParser[Expression] = term ~ "===" ~ term ^^ {
-    case l ~ _ ~ r => EqualTo(l, r)
-  }
-
-  lazy val equalToAlt: PackratParser[Expression] = term ~ "=" ~ term ^^ {
-    case l ~ _ ~ r => EqualTo(l, r)
-  }
-
-  lazy val notEqualTo: PackratParser[Expression] = term ~ "!==" ~ term ^^ {
-    case l ~ _ ~ r => NotEqualTo(l, r)
-  }
-
-  lazy val greaterThan: PackratParser[Expression] = term ~ ">" ~ term ^^ {
-    case l ~ _ ~ r => GreaterThan(l, r)
-  }
-
-  lazy val greaterThanOrEqual: PackratParser[Expression] = term ~ ">=" ~ term 
^^ {
-    case l ~ _ ~ r => GreaterThanOrEqual(l, r)
-  }
-
-  lazy val lessThan: PackratParser[Expression] = term ~ "<" ~ term ^^ {
-    case l ~ _ ~ r => LessThan(l, r)
-  }
-
-  lazy val lessThanOrEqual: PackratParser[Expression] = term ~ "<=" ~ term ^^ {
-    case l ~ _ ~ r => LessThanOrEqual(l, r)
-  }
-
-  lazy val comparison: PackratParser[Expression] =
-      equalTo | equalToAlt | notEqualTo |
-      greaterThan | greaterThanOrEqual |
-      lessThan | lessThanOrEqual | term
-
-  // logic
-
-  lazy val logic = comparison * (
-    "&&" ^^^ { (a:Expression, b:Expression) => And(a,b) } |
-      "||" ^^^ { (a:Expression, b:Expression) => Or(a,b) } )
-
-  // alias
-
-  lazy val alias: PackratParser[Expression] = logic ~ "as" ~ fieldReference ^^ 
{
-    case e ~ _ ~ name => Naming(e, name.name)
-  } | logic
-
-  lazy val expression: PackratParser[Expression] = alias
-
-  lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",")
-
-  def parseExpressionList(expression: String): List[Expression] = {
-    parseAll(expressionList, expression) match {
-      case Success(lst, _) => lst
-
-      case Failure(msg, _) => throw new ExpressionException("Could not parse 
expression: " + msg)
-
-      case Error(msg, _) => throw new ExpressionException("Could not parse 
expression: " + msg)
-    }
-  }
-
-  def parseExpression(exprString: String): Expression = {
-    parseAll(expression, exprString) match {
-      case Success(lst, _) => lst
-
-      case fail =>
-        throw new ExpressionException("Could not parse expression: " + 
fail.toString)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionAggregateFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionAggregateFunction.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionAggregateFunction.scala
deleted file mode 100644
index 5a2b87d..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionAggregateFunction.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.runtime
-
-import org.apache.flink.api.expressions.Row
-import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable
-import org.apache.flink.api.java.aggregation.AggregationFunction
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.util.Collector
-
-@Combinable
-class ExpressionAggregateFunction(
-    private val fieldPositions: Seq[Int],
-    private val functions: Seq[AggregationFunction[Any]])
-  extends RichGroupReduceFunction[Row, Row] {
-
-  override def open(conf: Configuration): Unit = {
-    var i = 0
-    val len = functions.length
-    while (i < len) {
-      functions(i).initializeAggregate()
-      i += 1
-    }
-  }
-
-  override def reduce(in: java.lang.Iterable[Row], out: Collector[Row]): Unit 
= {
-
-    val fieldPositions = this.fieldPositions
-    val functions = this.functions
-
-    var current: Row = null
-
-    val values = in.iterator()
-    while (values.hasNext) {
-      current = values.next()
-
-      var i = 0
-      val len = functions.length
-      while (i < len) {
-        functions(i).aggregate(current.productElement(fieldPositions(i)))
-        i += 1
-      }
-    }
-
-    var i = 0
-    val len = functions.length
-    while (i < len) {
-      current.setField(fieldPositions(i), functions(i).getAggregate)
-      functions(i).initializeAggregate()
-      i += 1
-    }
-
-    out.collect(current)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionFilterFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionFilterFunction.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionFilterFunction.scala
deleted file mode 100644
index d766486..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionFilterFunction.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.runtime
-
-import org.apache.flink.api.expressions.codegen.GenerateUnaryPredicate
-import org.apache.flink.api.expressions.tree.{NopExpression, Expression}
-import org.apache.flink.api.common.functions.RichFilterFunction
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.configuration.Configuration
-
-class ExpressionFilterFunction[T](
-    predicate: Expression,
-    inputType: CompositeType[T]) extends RichFilterFunction[T] {
-
-  var compiledPredicate: (T) => Boolean = null
-
-  override def open(config: Configuration): Unit = {
-    if (compiledPredicate == null) {
-      compiledPredicate = predicate match {
-        case n: NopExpression => null
-        case _ =>
-          val codegen = new GenerateUnaryPredicate[T](
-            inputType,
-            predicate,
-            getRuntimeContext.getUserCodeClassLoader)
-          codegen.generate()
-      }
-    }
-  }
-
-  override def filter(in: T) = compiledPredicate(in)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionJoinFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionJoinFunction.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionJoinFunction.scala
deleted file mode 100644
index 46715cf..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionJoinFunction.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.runtime
-
-import org.apache.flink.api.expressions.tree.{NopExpression, Expression}
-import org.apache.flink.api.common.functions.RichFlatJoinFunction
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.expressions.codegen.{GenerateBinaryResultAssembler,
-GenerateBinaryPredicate}
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.util.Collector
-
-class ExpressionJoinFunction[L, R, O](
-    predicate: Expression,
-    leftType: CompositeType[L],
-    rightType: CompositeType[R],
-    resultType: CompositeType[O],
-    outputFields: Seq[Expression]) extends RichFlatJoinFunction[L, R, O] {
-
-  var compiledPredicate: (L, R) => Boolean = null
-  var resultAssembler: (L, R, O) => O = null
-  var result: O = null.asInstanceOf[O]
-
-  override def open(config: Configuration): Unit = {
-    result = 
resultType.createSerializer(getRuntimeContext.getExecutionConfig).createInstance()
-    if (compiledPredicate == null) {
-      compiledPredicate = predicate match {
-        case n: NopExpression => null
-        case _ =>
-          val codegen = new GenerateBinaryPredicate[L, R](
-            leftType,
-            rightType,
-            predicate,
-            getRuntimeContext.getUserCodeClassLoader)
-          codegen.generate()
-      }
-    }
-
-    if (resultAssembler == null) {
-      val resultCodegen = new GenerateBinaryResultAssembler[L, R, O](
-        leftType,
-        rightType,
-        resultType,
-        outputFields,
-        getRuntimeContext.getUserCodeClassLoader)
-
-      resultAssembler = resultCodegen.generate()
-    }
-  }
-
-  def join(left: L, right: R, out: Collector[O]) = {
-    if (compiledPredicate == null) {
-      result = resultAssembler(left, right, result)
-      out.collect(result)
-    } else {
-      if (compiledPredicate(left, right)) {
-        result = resultAssembler(left, right, result)
-        out.collect(result)      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionSelectFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionSelectFunction.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionSelectFunction.scala
deleted file mode 100644
index 5cdd8b2..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionSelectFunction.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.runtime
-
-import org.apache.flink.api.expressions.tree.Expression
-import org.apache.flink.api.common.functions.RichMapFunction
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.expressions.codegen.GenerateUnaryResultAssembler
-import org.apache.flink.configuration.Configuration
-
-class ExpressionSelectFunction[I, O](
-     inputType: CompositeType[I],
-     resultType: CompositeType[O],
-     outputFields: Seq[Expression]) extends RichMapFunction[I, O] {
-
-  var resultAssembler: (I, O) => O = null
-  var result: O = null.asInstanceOf[O]
-
-  override def open(config: Configuration): Unit = {
-    result = 
resultType.createSerializer(getRuntimeContext.getExecutionConfig).createInstance()
-
-    if (resultAssembler == null) {
-      val resultCodegen = new GenerateUnaryResultAssembler[I, O](
-        inputType,
-        resultType,
-        outputFields,
-        getRuntimeContext.getUserCodeClassLoader)
-
-      resultAssembler = resultCodegen.generate()
-    }
-  }
-
-  def map(in: I): O = {
-    resultAssembler(in, result)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/package.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/package.scala
deleted file mode 100644
index 0a3d683..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions
-
-/**
- * The functions in this package are used to translate expression operations 
to Java API operations.
- */
-package object runtime

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/Expression.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/Expression.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/Expression.scala
deleted file mode 100644
index 8264705..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/Expression.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.tree
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, TypeInformation}
-
-import scala.language.postfixOps
-
-
-abstract class Expression extends Product {
-  def children: Seq[Expression]
-  def name: String = Expression.freshName("expression")
-  def typeInfo: TypeInformation[_]
-
-  /**
-   * Tests for equality by first testing for reference equality.
-   */
-  def fastEquals(other: Expression): Boolean = this.eq(other) || this == other
-
-  def transformPre(rule: PartialFunction[Expression, Expression]): Expression 
= {
-    val afterTransform = rule.applyOrElse(this, identity[Expression])
-
-    if (afterTransform fastEquals this) {
-      this.transformChildrenPre(rule)
-    } else {
-      afterTransform.transformChildrenPre(rule)
-    }
-  }
-
-  def transformChildrenPre(rule: PartialFunction[Expression, Expression]): 
Expression = {
-    var changed = false
-    val newArgs = productIterator map {
-      case child: Expression if children.contains(child) =>
-        val newChild = child.transformPre(rule)
-        if (newChild fastEquals child) {
-          child
-        } else {
-          changed = true
-          newChild
-        }
-      case other: AnyRef => other
-      case null => null
-    } toArray
-
-    if (changed) makeCopy(newArgs) else this
-  }
-
-  def transformPost(rule: PartialFunction[Expression, Expression]): Expression 
= {
-    val afterChildren = transformChildrenPost(rule)
-    if (afterChildren fastEquals this) {
-      rule.applyOrElse(this, identity[Expression])
-    } else {
-      rule.applyOrElse(afterChildren, identity[Expression])
-    }
-  }
-
-  def transformChildrenPost(rule: PartialFunction[Expression, Expression]): 
Expression = {
-    var changed = false
-    val newArgs = productIterator map {
-      case child: Expression if children.contains(child) =>
-        val newChild = child.transformPost(rule)
-        if (newChild fastEquals child) {
-          child
-        } else {
-          changed = true
-          newChild
-        }
-      case other: AnyRef => other
-      case null => null
-    } toArray
-    // toArray forces evaluation, toSeq does not seem to work here
-
-    if (changed) makeCopy(newArgs) else this
-  }
-
-  def exists(predicate: Expression => Boolean): Boolean = {
-    var exists = false
-    this.transformPre {
-      case e: Expression => if (predicate(e)) {
-        exists = true
-      }
-        e
-    }
-    exists
-  }
-
-  /**
-   * Creates a new copy of this expression with new children. This is used 
during transformation
-   * if children change. This must be overridden by Expressions that don't 
have the Constructor
-   * arguments in the same order as the `children`.
-   */
-  def makeCopy(newArgs: Seq[AnyRef]): this.type = {
-    val defaultCtor =
-      this.getClass.getConstructors.find { _.getParameterTypes.size > 0}.head
-    try {
-      defaultCtor.newInstance(newArgs.toArray: _*).asInstanceOf[this.type]
-    } catch {
-      case iae: IllegalArgumentException =>
-        println("IAE " + this)
-        throw new RuntimeException("Should never happen.")
-    }
-  }
-}
-
-abstract class BinaryExpression() extends Expression {
-  def left: Expression
-  def right: Expression
-  def children = Seq(left, right)
-}
-
-abstract class UnaryExpression() extends Expression {
-  def child: Expression
-  def children = Seq(child)
-}
-
-abstract class LeafExpression() extends Expression {
-  val children = Nil
-}
-
-case class NopExpression() extends LeafExpression {
-  val typeInfo = new NothingTypeInfo()
-  override val name = Expression.freshName("nop")
-
-}
-
-object Expression {
-  def freshName(prefix: String): String = {
-    s"$prefix-${freshNameCounter.getAndIncrement}"
-  }
-
-  val freshNameCounter = new AtomicInteger
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/aggregations.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/aggregations.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/aggregations.scala
deleted file mode 100644
index d3b19fa..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/aggregations.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.expressions.tree
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.java.aggregation.Aggregations
-
-
-abstract sealed class Aggregation extends UnaryExpression {
-  def typeInfo = {
-    child.typeInfo match {
-      case BasicTypeInfo.LONG_TYPE_INFO => // ok
-      case BasicTypeInfo.INT_TYPE_INFO =>
-      case BasicTypeInfo.DOUBLE_TYPE_INFO =>
-      case BasicTypeInfo.FLOAT_TYPE_INFO =>
-      case BasicTypeInfo.BYTE_TYPE_INFO =>
-      case BasicTypeInfo.SHORT_TYPE_INFO =>
-      case _ =>
-      throw new ExpressionException(s"Unsupported type ${child.typeInfo} for " 
+
-        s"aggregation $this. Only numeric data types supported.")
-    }
-    child.typeInfo
-  }
-
-  override def toString = s"Aggregate($child)"
-
-  def getIntermediateFields: Seq[Expression]
-  def getFinalField(inputs: Seq[Expression]): Expression
-  def getAggregations: Seq[Aggregations]
-}
-
-case class Sum(child: Expression) extends Aggregation {
-  override def toString = s"($child).sum"
-
-  override def getIntermediateFields: Seq[Expression] = Seq(child)
-  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
-  override def getAggregations = Seq(Aggregations.SUM)
-}
-
-case class Min(child: Expression) extends Aggregation {
-  override def toString = s"($child).min"
-
-  override def getIntermediateFields: Seq[Expression] = Seq(child)
-  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
-  override def getAggregations = Seq(Aggregations.MIN)
-
-}
-
-case class Max(child: Expression) extends Aggregation {
-  override def toString = s"($child).max"
-
-  override def getIntermediateFields: Seq[Expression] = Seq(child)
-  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
-  override def getAggregations = Seq(Aggregations.MAX)
-}
-
-case class Count(child: Expression) extends Aggregation {
-  override def typeInfo = {
-    child.typeInfo match {
-      case _ => // we can count anything... :D
-    }
-    BasicTypeInfo.INT_TYPE_INFO
-  }
-
-  override def toString = s"($child).count"
-
-  override def getIntermediateFields: Seq[Expression] = 
Seq(Literal(Integer.valueOf(1)))
-  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
-  override def getAggregations = Seq(Aggregations.SUM)
-
-}
-
-case class Avg(child: Expression) extends Aggregation {
-  override def toString = s"($child).avg"
-
-  override def getIntermediateFields: Seq[Expression] = Seq(child, Literal(1))
-  // This is just sweet. Use our own AST representation and let the code 
generator do
-  // our dirty work.
-  override def getFinalField(inputs: Seq[Expression]): Expression =
-    Div(inputs(0), inputs(1))
-  override def getAggregations = Seq(Aggregations.SUM, Aggregations.SUM)
-
-}

Reply via email to