http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/ExpressionCodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/ExpressionCodeGenerator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/ExpressionCodeGenerator.scala deleted file mode 100644 index cafba57..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/ExpressionCodeGenerator.scala +++ /dev/null @@ -1,635 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.codegen - -import java.util.concurrent.atomic.AtomicInteger -import org.apache.flink.api.expressions.tree._ -import org.apache.flink.api.expressions.typeinfo.{RenamingProxyTypeInfo, RowTypeInfo} -import org.apache.flink.api.expressions.{ExpressionException, tree} -import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, BasicTypeInfo, TypeInformation} - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils.{TupleTypeInfo, PojoTypeInfo} -import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.slf4j.LoggerFactory - -import scala.collection.JavaConverters._ -import scala.collection.mutable - -/** Base class for all code generation classes. This provides the functionality for generating - * code from an [[Expression]] tree. Derived classes must embed this in a lambda function - * to form an executable code block. - * - * @param inputs List of input variable names with corresponding [[TypeInformation]]. - * @param nullCheck Whether the generated code should include checks for NULL values. - * @param cl The ClassLoader that is used to create the Scala reflection ToolBox - * @tparam R The type of the generated code block. In most cases a lambda function such - * as "(IN1, IN2) => OUT". - */ -abstract class ExpressionCodeGenerator[R]( - inputs: Seq[(String, CompositeType[_])], - val nullCheck: Boolean = false, - cl: ClassLoader) { - protected val log = LoggerFactory.getLogger(classOf[ExpressionCodeGenerator[_]]) - - import scala.reflect.runtime.{universe => ru} - import scala.reflect.runtime.universe._ - - if (cl == null) { - throw new IllegalArgumentException("ClassLoader must not be null.") - } - - import scala.tools.reflect.ToolBox - - protected val (mirror, toolBox) = ReflectionLock.synchronized { - val mirror = runtimeMirror(cl) - (mirror, mirror.mkToolBox()) - } - - // This is to be implemented by subclasses, we have it like this - // so that we only call it from here with the Scala Reflection Lock. - protected def generateInternal(): R - - final def generate(): R = { - ReflectionLock.synchronized { - generateInternal() - } - } - - val cache = mutable.HashMap[Expression, GeneratedExpression]() - - protected def generateExpression(expr: Expression): GeneratedExpression = { - // doesn't work yet, because we insert the same code twice and reuse variable names - // cache.getOrElseUpdate(expr, generateExpressionInternal(expr)) - generateExpressionInternal(expr) - } - - protected def generateExpressionInternal(expr: Expression): GeneratedExpression = { - // protected def generateExpression(expr: Expression): GeneratedExpression = { - val nullTerm = freshTermName("isNull") - val resultTerm = freshTermName("result") - - // For binary predicates that must only be evaluated when both operands are non-null. - // This will write to nullTerm and resultTerm, so don't use those term names - // after using this function - def generateIfNonNull(left: Expression, right: Expression, resultType: TypeInformation[_]) - (expr: (TermName, TermName) => Tree): Seq[Tree] = { - val leftCode = generateExpression(left) - val rightCode = generateExpression(right) - - - if (nullCheck) { - leftCode.code ++ rightCode.code ++ q""" - val $nullTerm = ${leftCode.nullTerm}|| ${rightCode.nullTerm} - val $resultTerm = if ($nullTerm) { - ${defaultPrimitive(resultType)} - } else { - ${expr(leftCode.resultTerm, rightCode.resultTerm)} - } - """.children - } else { - leftCode.code ++ rightCode.code :+ q""" - val $resultTerm = ${expr(leftCode.resultTerm, rightCode.resultTerm)} - """ - } - } - - val cleanedExpr = expr match { - case tree.Naming(namedExpr, _) => namedExpr - case _ => expr - } - - val code: Seq[Tree] = cleanedExpr match { - - case tree.Literal(null, typeInfo) => - if (nullCheck) { - q""" - val $nullTerm = true - val resultTerm = null - """.children - } else { - Seq( q""" - val resultTerm = null - """) - } - - case tree.Literal(intValue: Int, INT_TYPE_INFO) => - if (nullCheck) { - q""" - val $nullTerm = false - val $resultTerm = $intValue - """.children - } else { - Seq( q""" - val $resultTerm = $intValue - """) - } - - case tree.Literal(longValue: Long, LONG_TYPE_INFO) => - if (nullCheck) { - q""" - val $nullTerm = false - val $resultTerm = $longValue - """.children - } else { - Seq( q""" - val $resultTerm = $longValue - """) - } - - - case tree.Literal(doubleValue: Double, DOUBLE_TYPE_INFO) => - if (nullCheck) { - q""" - val $nullTerm = false - val $resultTerm = $doubleValue - """.children - } else { - Seq( q""" - val $resultTerm = $doubleValue - """) - } - - case tree.Literal(floatValue: Float, FLOAT_TYPE_INFO) => - if (nullCheck) { - q""" - val $nullTerm = false - val $resultTerm = $floatValue - """.children - } else { - Seq( q""" - val $resultTerm = $floatValue - """) - } - - case tree.Literal(strValue: String, STRING_TYPE_INFO) => - if (nullCheck) { - q""" - val $nullTerm = false - val $resultTerm = $strValue - """.children - } else { - Seq( q""" - val $resultTerm = $strValue - """) - } - - case tree.Literal(boolValue: Boolean, BOOLEAN_TYPE_INFO) => - if (nullCheck) { - q""" - val $nullTerm = false - val $resultTerm = $boolValue - """.children - } else { - Seq( q""" - val $resultTerm = $boolValue - """) - } - - case Substring(str, beginIndex, endIndex) => - val strCode = generateExpression(str) - val beginIndexCode = generateExpression(beginIndex) - val endIndexCode = generateExpression(endIndex) - if (nullCheck) { - strCode.code ++ beginIndexCode.code ++ endIndexCode.code ++ q""" - val $nullTerm = - ${strCode.nullTerm}|| ${beginIndexCode.nullTerm}|| ${endIndexCode.nullTerm} - if ($nullTerm) { - ${defaultPrimitive(str.typeInfo)} - } else { - val $resultTerm = if (${endIndexCode.resultTerm} == Int.MaxValue) { - (${strCode.resultTerm}).substring(${beginIndexCode.resultTerm}) - } else { - (${strCode.resultTerm}).substring( - ${beginIndexCode.resultTerm}, - ${endIndexCode.resultTerm}) - } - } - """.children - } else { - strCode.code ++ beginIndexCode.code ++ endIndexCode.code :+ q""" - val $resultTerm = if (${endIndexCode.resultTerm} == Int.MaxValue) { - (${strCode.resultTerm}).substring(${beginIndexCode.resultTerm}) - } else { - (${strCode.resultTerm}).substring( - ${beginIndexCode.resultTerm}, - ${endIndexCode.resultTerm}) - } - """ - } - - case tree.Cast(child: Expression, STRING_TYPE_INFO) => - val childGen = generateExpression(child) - val castCode = if (nullCheck) { - q""" - val $nullTerm = ${childGen.nullTerm} - val $resultTerm = if ($nullTerm == null) { - null - } else { - ${childGen.resultTerm}.toString - } - """.children - } else { - Seq( q""" - val $resultTerm = ${childGen.resultTerm}.toString - """) - } - childGen.code ++ castCode - - case tree.Cast(child: Expression, INT_TYPE_INFO) => - val childGen = generateExpression(child) - val castCode = if (nullCheck) { - q""" - val $nullTerm = ${childGen.nullTerm} - val $resultTerm = ${childGen.resultTerm}.toInt - """.children - } else { - Seq( q""" - val $resultTerm = ${childGen.resultTerm}.toInt - """) - } - childGen.code ++ castCode - - case tree.Cast(child: Expression, LONG_TYPE_INFO) => - val childGen = generateExpression(child) - val castCode = if (nullCheck) { - q""" - val $nullTerm = ${childGen.nullTerm} - val $resultTerm = ${childGen.resultTerm}.toLong - """.children - } else { - Seq( q""" - val $resultTerm = ${childGen.resultTerm}.toLong - """) - } - childGen.code ++ castCode - - case tree.Cast(child: Expression, FLOAT_TYPE_INFO) => - val childGen = generateExpression(child) - val castCode = if (nullCheck) { - q""" - val $nullTerm = ${childGen.nullTerm} - val $resultTerm = ${childGen.resultTerm}.toFloat - """.children - } else { - Seq( q""" - val $resultTerm = ${childGen.resultTerm}.toFloat - """) - } - childGen.code ++ castCode - - case tree.Cast(child: Expression, DOUBLE_TYPE_INFO) => - val childGen = generateExpression(child) - val castCode = if (nullCheck) { - q""" - val $nullTerm = ${childGen.nullTerm} - val $resultTerm = ${childGen.resultTerm}.toDouble - """.children - } else { - Seq( q""" - val $resultTerm = ${childGen.resultTerm}.toDouble - """) - } - childGen.code ++ castCode - - case ResolvedFieldReference(fieldName, fieldTpe: TypeInformation[_]) => - inputs find { i => i._2.hasField(fieldName)} match { - case Some((inputName, inputTpe)) => - val fieldCode = getField(newTermName(inputName), inputTpe, fieldName, fieldTpe) - if (nullCheck) { - q""" - val $resultTerm = $fieldCode - val $nullTerm = $resultTerm == null - """.children - } else { - Seq( q""" - val $resultTerm = $fieldCode - """) - } - - case None => throw new ExpressionException("Could not get accessor for " + fieldName - + " in inputs " + inputs.mkString(", ") + ".") - } - - case GreaterThan(left, right) => - generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { - (leftTerm, rightTerm) => q"$leftTerm > $rightTerm" - } - - case GreaterThanOrEqual(left, right) => - generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { - (leftTerm, rightTerm) => q"$leftTerm >= $rightTerm" - } - - case LessThan(left, right) => - generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { - (leftTerm, rightTerm) => q"$leftTerm < $rightTerm" - } - - case LessThanOrEqual(left, right) => - generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { - (leftTerm, rightTerm) => q"$leftTerm <= $rightTerm" - } - - case EqualTo(left, right) => - generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { - (leftTerm, rightTerm) => q"$leftTerm == $rightTerm" - } - - case NotEqualTo(left, right) => - generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { - (leftTerm, rightTerm) => q"$leftTerm != $rightTerm" - } - - case And(left, right) => - generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { - (leftTerm, rightTerm) => q"$leftTerm && $rightTerm" - } - - case Or(left, right) => - generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { - (leftTerm, rightTerm) => q"$leftTerm || $rightTerm" - } - - case Plus(left, right) => - generateIfNonNull(left, right, expr.typeInfo) { - (leftTerm, rightTerm) => q"$leftTerm + $rightTerm" - } - - case Minus(left, right) => - generateIfNonNull(left, right, expr.typeInfo) { - (leftTerm, rightTerm) => q"$leftTerm - $rightTerm" - } - - case Div(left, right) => - generateIfNonNull(left, right, expr.typeInfo) { - (leftTerm, rightTerm) => q"$leftTerm / $rightTerm" - } - - case Mul(left, right) => - generateIfNonNull(left, right, expr.typeInfo) { - (leftTerm, rightTerm) => q"$leftTerm * $rightTerm" - } - - case Mod(left, right) => - generateIfNonNull(left, right, expr.typeInfo) { - (leftTerm, rightTerm) => q"$leftTerm % $rightTerm" - } - - case UnaryMinus(child) => - val childCode = generateExpression(child) - if (nullCheck) { - childCode.code ++ q""" - val $nullTerm = ${childCode.nullTerm} - if ($nullTerm) { - ${defaultPrimitive(child.typeInfo)} - } else { - val $resultTerm = -(${childCode.resultTerm}) - } - """.children - } else { - childCode.code :+ q""" - val $resultTerm = -(${childCode.resultTerm}) - """ - } - - case BitwiseAnd(left, right) => - generateIfNonNull(left, right, expr.typeInfo) { - (leftTerm, rightTerm) => q"$leftTerm & $rightTerm" - } - - case BitwiseOr(left, right) => - generateIfNonNull(left, right, expr.typeInfo) { - (leftTerm, rightTerm) => q"$leftTerm | $rightTerm" - } - - case BitwiseXor(left, right) => - generateIfNonNull(left, right, expr.typeInfo) { - (leftTerm, rightTerm) => q"$leftTerm ^ $rightTerm" - } - - case BitwiseNot(child) => - val childCode = generateExpression(child) - if (nullCheck) { - childCode.code ++ q""" - val $nullTerm = ${childCode.nullTerm} - if ($nullTerm) { - ${defaultPrimitive(child.typeInfo)} - } else { - val $resultTerm = ~(${childCode.resultTerm}) - } - """.children - } else { - childCode.code :+ q""" - val $resultTerm = ~(${childCode.resultTerm}) - """ - } - - case Not(child) => - val childCode = generateExpression(child) - if (nullCheck) { - childCode.code ++ q""" - val $nullTerm = ${childCode.nullTerm} - if ($nullTerm) { - ${defaultPrimitive(child.typeInfo)} - } else { - val $resultTerm = !(${childCode.resultTerm}) - } - """.children - } else { - childCode.code :+ q""" - val $resultTerm = !(${childCode.resultTerm}) - """ - } - - case IsNull(child) => - val childCode = generateExpression(child) - if (nullCheck) { - childCode.code ++ q""" - val $nullTerm = ${childCode.nullTerm} - if ($nullTerm) { - ${defaultPrimitive(child.typeInfo)} - } else { - val $resultTerm = (${childCode.resultTerm}) == null - } - """.children - } else { - childCode.code :+ q""" - val $resultTerm = (${childCode.resultTerm}) == null - """ - } - - case IsNotNull(child) => - val childCode = generateExpression(child) - if (nullCheck) { - childCode.code ++ q""" - val $nullTerm = ${childCode.nullTerm} - if ($nullTerm) { - ${defaultPrimitive(child.typeInfo)} - } else { - val $resultTerm = (${childCode.resultTerm}) != null - } - """.children - } else { - childCode.code :+ q""" - val $resultTerm = (${childCode.resultTerm}) != null - """ - } - - case Abs(child) => - val childCode = generateExpression(child) - if (nullCheck) { - childCode.code ++ q""" - val $nullTerm = ${childCode.nullTerm} - if ($nullTerm) { - ${defaultPrimitive(child.typeInfo)} - } else { - val $resultTerm = Math.abs(${childCode.resultTerm}) - } - """.children - } else { - childCode.code :+ q""" - val $resultTerm = Math.abs(${childCode.resultTerm}) - """ - } - - case _ => throw new ExpressionException("Could not generate code for expression " + expr) - } - - GeneratedExpression(code, resultTerm, nullTerm) - } - - case class GeneratedExpression(code: Seq[Tree], resultTerm: TermName, nullTerm: TermName) - - // We don't have c.freshName - // According to http://docs.scala-lang.org/overviews/quasiquotes/hygiene.html - // it's coming for 2.11. We can't wait that long... - def freshTermName(name: String): TermName = { - newTermName(s"$name$$${freshNameCounter.getAndIncrement}") - } - - val freshNameCounter = new AtomicInteger - - protected def getField( - inputTerm: TermName, - inputType: CompositeType[_], - fieldName: String, - fieldType: TypeInformation[_]): Tree = { - val accessor = fieldAccessorFor(inputType, fieldName) - accessor match { - case ObjectFieldAccessor(fieldName) => - val fieldTerm = newTermName(fieldName) - q"$inputTerm.$fieldTerm.asInstanceOf[${typeTermForTypeInfo(fieldType)}]" - - case ObjectMethodAccessor(methodName) => - val methodTerm = newTermName(methodName) - q"$inputTerm.$methodTerm().asInstanceOf[${typeTermForTypeInfo(fieldType)}]" - - case ProductAccessor(i) => - q"$inputTerm.productElement($i).asInstanceOf[${typeTermForTypeInfo(fieldType)}]" - - } - } - - sealed abstract class FieldAccessor - - case class ObjectFieldAccessor(fieldName: String) extends FieldAccessor - - case class ObjectMethodAccessor(methodName: String) extends FieldAccessor - - case class ProductAccessor(i: Int) extends FieldAccessor - - def fieldAccessorFor(elementType: CompositeType[_], fieldName: String): FieldAccessor = { - elementType match { - case ri: RowTypeInfo => - ProductAccessor(elementType.getFieldIndex(fieldName)) - - case cc: CaseClassTypeInfo[_] => - ObjectFieldAccessor(fieldName) - - case javaTup: TupleTypeInfo[_] => - ObjectFieldAccessor(fieldName) - - case pj: PojoTypeInfo[_] => - ObjectFieldAccessor(fieldName) - - case proxy: RenamingProxyTypeInfo[_] => - val underlying = proxy.getUnderlyingType - val fieldIndex = proxy.getFieldIndex(fieldName) - fieldAccessorFor(underlying, underlying.getFieldNames()(fieldIndex)) - } - } - - protected def defaultPrimitive(tpe: TypeInformation[_]) = tpe match { - case BasicTypeInfo.INT_TYPE_INFO => ru.Literal(Constant(-1)) - case BasicTypeInfo.LONG_TYPE_INFO => ru.Literal(Constant(1L)) - case BasicTypeInfo.SHORT_TYPE_INFO => ru.Literal(Constant(-1.toShort)) - case BasicTypeInfo.BYTE_TYPE_INFO => ru.Literal(Constant(-1.toByte)) - case BasicTypeInfo.FLOAT_TYPE_INFO => ru.Literal(Constant(-1.0.toFloat)) - case BasicTypeInfo.DOUBLE_TYPE_INFO => ru.Literal(Constant(-1.toDouble)) - case BasicTypeInfo.BOOLEAN_TYPE_INFO => ru.Literal(Constant(false)) - case BasicTypeInfo.STRING_TYPE_INFO => ru.Literal(Constant("<empty>")) - case BasicTypeInfo.CHAR_TYPE_INFO => ru.Literal(Constant('\0')) - case _ => ru.Literal(Constant(null)) - } - - protected def typeTermForTypeInfo(typeInfo: TypeInformation[_]): Tree = { - val tpe = typeForTypeInfo(typeInfo) - tq"$tpe" - } - - // We need two separate methods here because typeForTypeInfo is recursive when generating - // the type for a type with generic parameters. - protected def typeForTypeInfo(tpe: TypeInformation[_]): Type = tpe match { - - // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections - // does not seem to like this, so we manually give the correct type here. - case PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Int]] - case PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Long]] - case PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Short]] - case PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Byte]] - case PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Float]] - case PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Double]] - case PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Boolean]] - case PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Char]] - - case _ => - val clazz = mirror.staticClass(tpe.getTypeClass.getCanonicalName) - - clazz.selfType.erasure match { - case ExistentialType(_, underlying) => underlying - - case tpe@TypeRef(prefix, sym, Nil) => - // Non-generic type, just return the type - tpe - - case TypeRef(prefix, sym, emptyParams) => - val genericTypeInfos = tpe.getGenericParameters.asScala - if (emptyParams.length != genericTypeInfos.length) { - throw new RuntimeException("Number of type parameters does not match.") - } - val typeParams = genericTypeInfos.map(typeForTypeInfo) - // TODO: remove, added only for migration of the line below, as suggested by the compiler - import compat._ - TypeRef(prefix, sym, typeParams.toList) - } - - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryPredicate.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryPredicate.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryPredicate.scala deleted file mode 100644 index ce80469..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryPredicate.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.codegen - -import org.apache.flink.api.expressions.tree.Expression -import org.apache.flink.api.common.typeutils.CompositeType -import org.slf4j.LoggerFactory - -/** - * Code generator for binary predicates, i.e. a Join or CoGroup Predicate. - */ -class GenerateBinaryPredicate[L, R]( - leftType: CompositeType[L], - rightType: CompositeType[R], - predicate: Expression, - cl: ClassLoader) - extends ExpressionCodeGenerator[(L, R) => Boolean]( - Seq(("input0", leftType), ("input1", rightType)), - cl = cl) { - - val LOG = LoggerFactory.getLogger(this.getClass) - - import scala.reflect.runtime.{universe => ru} - import scala.reflect.runtime.universe._ - - override protected def generateInternal(): ((L, R) => Boolean) = { - val pred = generateExpression(predicate) - - val in0 = newTermName("input0") - val in1 = newTermName("input1") - - val leftTpe = typeTermForTypeInfo(leftType) - val rightTpe = typeTermForTypeInfo(rightType) - - val code = if (nullCheck) { - q""" - ($in0: $leftTpe, $in1: $rightTpe) => { - ..${pred.code} - if (${pred.nullTerm}) { - false - } else { - ${pred.resultTerm} - } - } - """ - } else { - q""" - ($in0: $leftTpe, $in1: $rightTpe) => { - ..${pred.code} - ${pred.resultTerm} - } - """ - } - - LOG.debug(s"""Generated binary predicate "$predicate":\n$code""") - toolBox.eval(code).asInstanceOf[(L, R) => Boolean] - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryResultAssembler.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryResultAssembler.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryResultAssembler.scala deleted file mode 100644 index 4066831..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateBinaryResultAssembler.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.codegen - -import org.apache.flink.api.expressions.tree.Expression -import org.apache.flink.api.common.typeutils.CompositeType -import org.slf4j.LoggerFactory - -/** - * Code generator for assembling the result of a binary operation. - */ -class GenerateBinaryResultAssembler[L, R, O]( - leftTypeInfo: CompositeType[L], - rightTypeInfo: CompositeType[R], - resultTypeInfo: CompositeType[O], - outputFields: Seq[Expression], - cl: ClassLoader) - extends GenerateResultAssembler[(L, R, O) => O]( - Seq(("input0", leftTypeInfo), ("input1", rightTypeInfo)), - cl = cl) { - - val LOG = LoggerFactory.getLogger(this.getClass) - - import scala.reflect.runtime.universe._ - - - override protected def generateInternal(): ((L, R, O) => O) = { - - val leftType = typeTermForTypeInfo(leftTypeInfo) - val rightType = typeTermForTypeInfo(rightTypeInfo) - val resultType = typeTermForTypeInfo(resultTypeInfo) - - val resultCode = createResult(resultTypeInfo, outputFields) - - val code: Tree = - q""" - (input0: $leftType, input1: $rightType, out: $resultType) => { - ..$resultCode - } - """ - - LOG.debug(s"Generated binary result-assembler:\n$code") - toolBox.eval(code).asInstanceOf[(L, R, O) => O] - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateResultAssembler.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateResultAssembler.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateResultAssembler.scala deleted file mode 100644 index 977126d..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateResultAssembler.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.codegen - -import org.apache.flink.api.expressions.tree.Expression -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.expressions.typeinfo.RowTypeInfo -import org.apache.flink.api.java.typeutils.{TupleTypeInfo, PojoTypeInfo} -import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo - -/** - * Base class for unary and binary result assembler code generators. - */ -abstract class GenerateResultAssembler[R]( - inputs: Seq[(String, CompositeType[_])], - cl: ClassLoader) - extends ExpressionCodeGenerator[R](inputs, cl = cl) { - import scala.reflect.runtime.{universe => ru} - import scala.reflect.runtime.universe._ - - def createResult[T]( - resultTypeInfo: CompositeType[T], - outputFields: Seq[Expression]): Tree = { - - val resultType = typeTermForTypeInfo(resultTypeInfo) - - val fieldsCode = outputFields.map(generateExpression) - - val block = resultTypeInfo match { - case ri: RowTypeInfo => - val resultSetters: Seq[Tree] = fieldsCode.zipWithIndex map { - case (fieldCode, i) => - q""" - out.setField($i, { ..${fieldCode.code}; ${fieldCode.resultTerm} }) - """ - } - - q""" - ..$resultSetters - out - """ - - case pj: PojoTypeInfo[_] => - val resultSetters: Seq[Tree] = fieldsCode.zip(outputFields) map { - case (fieldCode, expr) => - val fieldName = newTermName(expr.name) - q""" - out.$fieldName = { ..${fieldCode.code}; ${fieldCode.resultTerm} } - """ - } - - q""" - ..$resultSetters - out - """ - - case tup: TupleTypeInfo[_] => - val resultSetters: Seq[Tree] = fieldsCode.zip(outputFields) map { - case (fieldCode, expr) => - val fieldName = newTermName(expr.name) - q""" - out.$fieldName = { ..${fieldCode.code}; ${fieldCode.resultTerm} } - """ - } - - q""" - ..$resultSetters - out - """ - - case cc: CaseClassTypeInfo[_] => - val resultFields: Seq[Tree] = fieldsCode map { - fieldCode => - q"{ ..${fieldCode.code}; ${fieldCode.resultTerm}}" - } - q""" - new $resultType(..$resultFields) - """ - } - - block - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryPredicate.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryPredicate.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryPredicate.scala deleted file mode 100644 index cd8edc4..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryPredicate.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.codegen - -import org.apache.flink.api.expressions.tree.Expression -import org.apache.flink.api.common.typeutils.CompositeType -import org.slf4j.LoggerFactory - -/** - * Code generator for a unary predicate, i.e. a Filter. - */ -class GenerateUnaryPredicate[T]( - inputType: CompositeType[T], - predicate: Expression, - cl: ClassLoader) extends ExpressionCodeGenerator[T => Boolean]( - Seq(("input0", inputType)), - cl = cl) { - - val LOG = LoggerFactory.getLogger(this.getClass) - - import scala.reflect.runtime.{universe => ru} - import scala.reflect.runtime.universe._ - - override protected def generateInternal(): (T => Boolean) = { - val pred = generateExpression(predicate) - - val tpe = typeTermForTypeInfo(inputType) - - val code = if (nullCheck) { - q""" - (input0: $tpe) => { - ..${pred.code} - if (${pred.nullTerm}) { - false - } else { - ${pred.resultTerm} - } - } - """ - } else { - q""" - (input0: $tpe) => { - ..${pred.code} - ${pred.resultTerm} - } - """ - } - - LOG.debug(s"""Generated unary predicate "$predicate":\n$code""") - toolBox.eval(code).asInstanceOf[(T) => Boolean] - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryResultAssembler.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryResultAssembler.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryResultAssembler.scala deleted file mode 100644 index 49970c1..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/GenerateUnaryResultAssembler.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.codegen - -import org.apache.flink.api.expressions.tree.Expression -import org.apache.flink.api.common.typeutils.CompositeType -import org.slf4j.LoggerFactory - -/** - * Code generator for assembling the result of a unary operation. - */ -class GenerateUnaryResultAssembler[I, O]( - inputTypeInfo: CompositeType[I], - resultTypeInfo: CompositeType[O], - outputFields: Seq[Expression], - cl: ClassLoader) - extends GenerateResultAssembler[(I, O) => O]( - Seq(("input0", inputTypeInfo)), - cl = cl) { - - val LOG = LoggerFactory.getLogger(this.getClass) - - import scala.reflect.runtime.universe._ - - override protected def generateInternal(): ((I, O) => O) = { - - val inputType = typeTermForTypeInfo(inputTypeInfo) - val resultType = typeTermForTypeInfo(resultTypeInfo) - - val resultCode = createResult(resultTypeInfo, outputFields) - - val code: Tree = - q""" - (input0: $inputType, out: $resultType) => { - ..$resultCode - } - """ - - LOG.debug(s"Generated unary result-assembler:\n${show(code)}") - toolBox.eval(code).asInstanceOf[(I, O) => O] - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/package.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/package.scala deleted file mode 100644 index 0b7c95d..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/codegen/package.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions - -package object codegen { - // Used in ExpressionCodeGenerator because Scala 2.10 reflection is not thread safe. We might - // have several parallel expression operators in one TaskManager, therefore we need to guard - // these operations. - object ReflectionLock -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/ExpandAggregations.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/ExpandAggregations.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/ExpandAggregations.scala deleted file mode 100644 index 723483c..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/ExpandAggregations.scala +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.operations - -import org.apache.flink.api.expressions.analysis.SelectionAnalyzer -import org.apache.flink.api.expressions.tree._ -import org.apache.flink.api.java.aggregation.Aggregations - -import scala.collection.mutable - -/** - * This is used to expand a [[Select]] that contains aggregations. If it is called on a [[Select]] - * without aggregations it is simply returned. - * - * This select: - * {{{ - * in.select('key, 'value.avg) - * }}} - * - * is transformed to this expansion: - * {{{ - * in - * .select('key, 'value, Literal(1) as 'intermediate.1) - * .aggregate('value.sum, 'intermediate.1.sum) - * .select('key, 'value / 'intermediate.1) - * }}} - * - * If the input of the [[Select]] is a [[GroupBy]] this is preserved before the aggregation. - */ -object ExpandAggregations { - def apply(select: Select): Operation = select match { - case Select(input, selection) => - - val aggregations = mutable.HashMap[(Expression, Aggregations), String]() - val intermediateFields = mutable.HashSet[Expression]() - val aggregationIntermediates = mutable.HashMap[Aggregation, Seq[Expression]]() - - var intermediateCount = 0 - selection foreach { f => - f.transformPre { - case agg: Aggregation => - val intermediateReferences = agg.getIntermediateFields.zip(agg.getAggregations) map { - case (expr, basicAgg) => - aggregations.get((expr, basicAgg)) match { - case Some(intermediateName) => - ResolvedFieldReference(intermediateName, expr.typeInfo) - case None => - intermediateCount = intermediateCount + 1 - val intermediateName = s"intermediate.$intermediateCount" - intermediateFields += Naming(expr, intermediateName) - aggregations((expr, basicAgg)) = intermediateName - ResolvedFieldReference(intermediateName, expr.typeInfo) - } - } - - aggregationIntermediates(agg) = intermediateReferences - // Return a NOP so that we don't add the children of the aggregation - // to intermediate fields. We already added the necessary fields to the list - // of intermediate fields. - NopExpression() - - case fa: ResolvedFieldReference => - if (!fa.name.startsWith("intermediate")) { - intermediateFields += Naming(fa, fa.name) - } - fa - } - } - - if (aggregations.isEmpty) { - // no aggregations, just return - return select - } - - // also add the grouping keys to the set of intermediate fields, because we use a Set, - // they are only added when not already present - input match { - case GroupBy(_, groupingFields) => - groupingFields foreach { - case fa: ResolvedFieldReference => - intermediateFields += Naming(fa, fa.name) - } - case _ => // Nothing to add - } - - val basicAggregations = aggregations.map { - case ((expr, basicAgg), fieldName) => - (fieldName, basicAgg) - } - - val finalFields = selection.map { f => - f.transformPre { - case agg: Aggregation => - val intermediates = aggregationIntermediates(agg) - agg.getFinalField(intermediates) - } - } - - val intermediateAnalyzer = new SelectionAnalyzer(input.outputFields) - val analyzedIntermediates = intermediateFields.toSeq.map(intermediateAnalyzer.analyze) - - val finalAnalyzer = - new SelectionAnalyzer(analyzedIntermediates.map(e => (e.name, e.typeInfo))) - val analyzedFinals = finalFields.map(finalAnalyzer.analyze) - - val result = input match { - case GroupBy(groupByInput, groupingFields) => - Select( - Aggregate( - GroupBy( - Select(groupByInput, analyzedIntermediates), - groupingFields), - basicAggregations.toSeq), - analyzedFinals) - - case _ => - Select( - Aggregate( - Select(input, analyzedIntermediates), - basicAggregations.toSeq), - analyzedFinals) - - } - - result - - case _ => select - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/OperationTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/OperationTranslator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/OperationTranslator.scala deleted file mode 100644 index 12e4793..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/OperationTranslator.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.operations - -import org.apache.flink.api.common.typeinfo.TypeInformation - -/** - * When an [[org.apache.flink.api.expressions.ExpressionOperation]] is created an - * [[OperationTranslator]] corresponding to the underlying representation (either - * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]] is - * created. This way, the expression API can be completely agnostic while translation back to the - * correct API is handled by the API specific translator. - */ -abstract class OperationTranslator { - - type Representation[A] - - def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): Representation[A] - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/operations.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/operations.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/operations.scala deleted file mode 100644 index d036631..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/operations.scala +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.operations - -import org.apache.flink.api.expressions.tree.Expression -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.aggregation.Aggregations - -/** - * Base class for all expression operations. - */ -sealed abstract class Operation { - def outputFields: Seq[(String, TypeInformation[_])] -} - -/** - * Operation that transforms a [[org.apache.flink.api.scala.DataSet]] or - * [[org.apache.flink.streaming.api.scala.DataStream]] into an expression operation. - */ -case class Root[T](input: T, outputFields: Seq[(String, TypeInformation[_])]) extends Operation - -/** - * Operation that joins two expression operations. A "filter" and a "select" should be applied - * after a join operation. - */ -case class Join(left: Operation, right: Operation) extends Operation { - def outputFields = left.outputFields ++ right.outputFields - - override def toString = s"Join($left, $right)" -} - -/** - * Operation that filters out elements that do not match the predicate expression. - */ -case class Filter(input: Operation, predicate: Expression) extends Operation { - def outputFields = input.outputFields - - override def toString = s"Filter($input, $predicate)" -} - -/** - * Selection expression. Similar to an SQL SELECT statement. The expressions can select fields - * and perform arithmetic or logic operations. The expressions can also perform aggregates - * on fields. - */ -case class Select(input: Operation, selection: Seq[Expression]) extends Operation { - def outputFields = selection.toSeq map { e => (e.name, e.typeInfo) } - - override def toString = s"Select($input, ${selection.mkString(",")})" -} - -/** - * Operation that gives new names to fields. Use this to disambiguate fields before a join - * operation. - */ -case class As(input: Operation, names: Seq[String]) extends Operation { - val outputFields = input.outputFields.zip(names) map { - case ((_, tpe), newName) => (newName, tpe) - } - - override def toString = s"As($input, ${names.mkString(",")})" -} - -/** - * Grouping operation. Keys are specified using field references. A group by operation os only - * useful when performing a select with aggregates afterwards. - * @param input - * @param fields - */ -case class GroupBy(input: Operation, fields: Seq[Expression]) extends Operation { - def outputFields = input.outputFields - - override def toString = s"GroupBy($input, ${fields.mkString(",")})" -} - -/** - * Internal operation. Selection operations containing aggregates are expanded to an [[Aggregate]] - * and a simple [[Select]]. - */ -case class Aggregate( - input: Operation, - aggregations: Seq[(String, Aggregations)]) extends Operation { - def outputFields = input.outputFields - - override def toString = s"Aggregate($input, ${aggregations.mkString(",")})" -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/package.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/package.scala deleted file mode 100644 index 2aa80b3..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/operations/package.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions - -/** - * The operations in this package are created by calling methods on [[ExpressionOperation]] they - * should not be manually created by users of the API. - */ -package object operations http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/parser/ExpressionParser.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/parser/ExpressionParser.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/parser/ExpressionParser.scala deleted file mode 100644 index da53ded..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/parser/ExpressionParser.scala +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.parser - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.expressions.operations.As -import org.apache.flink.api.expressions.tree._ - -import scala.util.parsing.combinator.{PackratParsers, JavaTokenParsers} - -/** - * Parser for expressions inside a String. This parses exactly the same expressions that - * would be accepted by the Scala Expression DSL. - * - * See [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and - * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]] for the constructs - * available in the Scala Expression DSL. This parser must be kept in sync with the Scala DSL - * lazy valined in the above files. - */ -object ExpressionParser extends JavaTokenParsers with PackratParsers { - - // Literals - - lazy val numberLiteral: PackratParser[Expression] = - ((wholeNumber <~ ("L" | "l")) | floatingPointNumber | decimalNumber | wholeNumber) ^^ { - str => - if (str.endsWith("L") || str.endsWith("l")) { - Literal(str.toLong) - } else if (str.matches("""-?\d+""")) { - Literal(str.toInt) - } else if (str.endsWith("f") | str.endsWith("F")) { - Literal(str.toFloat) - } else { - Literal(str.toDouble) - } - } - - lazy val singleQuoteStringLiteral: Parser[Expression] = - ("'" + """([^'\p{Cntrl}\\]|\\[\\'"bfnrt]|\\u[a-fA-F0-9]{4})*""" + "'").r ^^ { - str => Literal(str.substring(1, str.length - 1)) - } - - lazy val stringLiteralFlink: PackratParser[Expression] = super.stringLiteral ^^ { - str => Literal(str.substring(1, str.length - 1)) - } - - lazy val boolLiteral: PackratParser[Expression] = ("true" | "false") ^^ { - str => Literal(str.toBoolean) - } - - lazy val literalExpr: PackratParser[Expression] = - numberLiteral | - stringLiteralFlink | singleQuoteStringLiteral | - boolLiteral - - lazy val fieldReference: PackratParser[Expression] = ident ^^ { - case sym => UnresolvedFieldReference(sym) - } - - lazy val atom: PackratParser[Expression] = - ( "(" ~> expression <~ ")" ) | literalExpr | fieldReference - - // suffix ops - lazy val isNull: PackratParser[Expression] = atom <~ ".isNull" ^^ { e => IsNull(e) } - lazy val isNotNull: PackratParser[Expression] = atom <~ ".isNotNull" ^^ { e => IsNotNull(e) } - - lazy val abs: PackratParser[Expression] = atom <~ ".abs" ^^ { e => Abs(e) } - - lazy val sum: PackratParser[Expression] = atom <~ ".sum" ^^ { e => Sum(e) } - lazy val min: PackratParser[Expression] = atom <~ ".min" ^^ { e => Min(e) } - lazy val max: PackratParser[Expression] = atom <~ ".max" ^^ { e => Max(e) } - lazy val count: PackratParser[Expression] = atom <~ ".count" ^^ { e => Count(e) } - lazy val avg: PackratParser[Expression] = atom <~ ".avg" ^^ { e => Avg(e) } - - lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ ")" ^^ { - case e ~ _ ~ as ~ _ => Naming(e, as.name) - } - - lazy val substring: PackratParser[Expression] = - atom ~ ".substring(" ~ expression ~ "," ~ expression ~ ")" ^^ { - case e ~ _ ~ from ~ _ ~ to ~ _ => Substring(e, from, to) - - } - - lazy val substringWithoutEnd: PackratParser[Expression] = - atom ~ ".substring(" ~ expression ~ ")" ^^ { - case e ~ _ ~ from ~ _ => Substring(e, from, Literal(Integer.MAX_VALUE)) - - } - - lazy val suffix = - isNull | isNotNull | - abs | sum | min | max | count | avg | - substring | substringWithoutEnd | atom - - - // unary ops - - lazy val unaryNot: PackratParser[Expression] = "!" ~> suffix ^^ { e => Not(e) } - - lazy val unaryMinus: PackratParser[Expression] = "-" ~> suffix ^^ { e => UnaryMinus(e) } - - lazy val unaryBitwiseNot: PackratParser[Expression] = "~" ~> suffix ^^ { e => BitwiseNot(e) } - - lazy val unary = unaryNot | unaryMinus | unaryBitwiseNot | suffix - - // binary bitwise opts - - lazy val binaryBitwise = unary * ( - "&" ^^^ { (a:Expression, b:Expression) => BitwiseAnd(a,b) } | - "|" ^^^ { (a:Expression, b:Expression) => BitwiseOr(a,b) } | - "^" ^^^ { (a:Expression, b:Expression) => BitwiseXor(a,b) } ) - - // arithmetic - - lazy val product = binaryBitwise * ( - "*" ^^^ { (a:Expression, b:Expression) => Mul(a,b) } | - "/" ^^^ { (a:Expression, b:Expression) => Div(a,b) } | - "%" ^^^ { (a:Expression, b:Expression) => Mod(a,b) } ) - - lazy val term = product * ( - "+" ^^^ { (a:Expression, b:Expression) => Plus(a,b) } | - "-" ^^^ { (a:Expression, b:Expression) => Minus(a,b) } ) - - // Comparison - - lazy val equalTo: PackratParser[Expression] = term ~ "===" ~ term ^^ { - case l ~ _ ~ r => EqualTo(l, r) - } - - lazy val equalToAlt: PackratParser[Expression] = term ~ "=" ~ term ^^ { - case l ~ _ ~ r => EqualTo(l, r) - } - - lazy val notEqualTo: PackratParser[Expression] = term ~ "!==" ~ term ^^ { - case l ~ _ ~ r => NotEqualTo(l, r) - } - - lazy val greaterThan: PackratParser[Expression] = term ~ ">" ~ term ^^ { - case l ~ _ ~ r => GreaterThan(l, r) - } - - lazy val greaterThanOrEqual: PackratParser[Expression] = term ~ ">=" ~ term ^^ { - case l ~ _ ~ r => GreaterThanOrEqual(l, r) - } - - lazy val lessThan: PackratParser[Expression] = term ~ "<" ~ term ^^ { - case l ~ _ ~ r => LessThan(l, r) - } - - lazy val lessThanOrEqual: PackratParser[Expression] = term ~ "<=" ~ term ^^ { - case l ~ _ ~ r => LessThanOrEqual(l, r) - } - - lazy val comparison: PackratParser[Expression] = - equalTo | equalToAlt | notEqualTo | - greaterThan | greaterThanOrEqual | - lessThan | lessThanOrEqual | term - - // logic - - lazy val logic = comparison * ( - "&&" ^^^ { (a:Expression, b:Expression) => And(a,b) } | - "||" ^^^ { (a:Expression, b:Expression) => Or(a,b) } ) - - // alias - - lazy val alias: PackratParser[Expression] = logic ~ "as" ~ fieldReference ^^ { - case e ~ _ ~ name => Naming(e, name.name) - } | logic - - lazy val expression: PackratParser[Expression] = alias - - lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",") - - def parseExpressionList(expression: String): List[Expression] = { - parseAll(expressionList, expression) match { - case Success(lst, _) => lst - - case Failure(msg, _) => throw new ExpressionException("Could not parse expression: " + msg) - - case Error(msg, _) => throw new ExpressionException("Could not parse expression: " + msg) - } - } - - def parseExpression(exprString: String): Expression = { - parseAll(expression, exprString) match { - case Success(lst, _) => lst - - case fail => - throw new ExpressionException("Could not parse expression: " + fail.toString) - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionAggregateFunction.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionAggregateFunction.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionAggregateFunction.scala deleted file mode 100644 index 5a2b87d..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionAggregateFunction.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.runtime - -import org.apache.flink.api.expressions.Row -import org.apache.flink.api.common.functions.RichGroupReduceFunction -import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable -import org.apache.flink.api.java.aggregation.AggregationFunction -import org.apache.flink.configuration.Configuration -import org.apache.flink.util.Collector - -@Combinable -class ExpressionAggregateFunction( - private val fieldPositions: Seq[Int], - private val functions: Seq[AggregationFunction[Any]]) - extends RichGroupReduceFunction[Row, Row] { - - override def open(conf: Configuration): Unit = { - var i = 0 - val len = functions.length - while (i < len) { - functions(i).initializeAggregate() - i += 1 - } - } - - override def reduce(in: java.lang.Iterable[Row], out: Collector[Row]): Unit = { - - val fieldPositions = this.fieldPositions - val functions = this.functions - - var current: Row = null - - val values = in.iterator() - while (values.hasNext) { - current = values.next() - - var i = 0 - val len = functions.length - while (i < len) { - functions(i).aggregate(current.productElement(fieldPositions(i))) - i += 1 - } - } - - var i = 0 - val len = functions.length - while (i < len) { - current.setField(fieldPositions(i), functions(i).getAggregate) - functions(i).initializeAggregate() - i += 1 - } - - out.collect(current) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionFilterFunction.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionFilterFunction.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionFilterFunction.scala deleted file mode 100644 index d766486..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionFilterFunction.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.runtime - -import org.apache.flink.api.expressions.codegen.GenerateUnaryPredicate -import org.apache.flink.api.expressions.tree.{NopExpression, Expression} -import org.apache.flink.api.common.functions.RichFilterFunction -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.configuration.Configuration - -class ExpressionFilterFunction[T]( - predicate: Expression, - inputType: CompositeType[T]) extends RichFilterFunction[T] { - - var compiledPredicate: (T) => Boolean = null - - override def open(config: Configuration): Unit = { - if (compiledPredicate == null) { - compiledPredicate = predicate match { - case n: NopExpression => null - case _ => - val codegen = new GenerateUnaryPredicate[T]( - inputType, - predicate, - getRuntimeContext.getUserCodeClassLoader) - codegen.generate() - } - } - } - - override def filter(in: T) = compiledPredicate(in) -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionJoinFunction.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionJoinFunction.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionJoinFunction.scala deleted file mode 100644 index 46715cf..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionJoinFunction.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.runtime - -import org.apache.flink.api.expressions.tree.{NopExpression, Expression} -import org.apache.flink.api.common.functions.RichFlatJoinFunction -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.expressions.codegen.{GenerateBinaryResultAssembler, -GenerateBinaryPredicate} -import org.apache.flink.configuration.Configuration -import org.apache.flink.util.Collector - -class ExpressionJoinFunction[L, R, O]( - predicate: Expression, - leftType: CompositeType[L], - rightType: CompositeType[R], - resultType: CompositeType[O], - outputFields: Seq[Expression]) extends RichFlatJoinFunction[L, R, O] { - - var compiledPredicate: (L, R) => Boolean = null - var resultAssembler: (L, R, O) => O = null - var result: O = null.asInstanceOf[O] - - override def open(config: Configuration): Unit = { - result = resultType.createSerializer(getRuntimeContext.getExecutionConfig).createInstance() - if (compiledPredicate == null) { - compiledPredicate = predicate match { - case n: NopExpression => null - case _ => - val codegen = new GenerateBinaryPredicate[L, R]( - leftType, - rightType, - predicate, - getRuntimeContext.getUserCodeClassLoader) - codegen.generate() - } - } - - if (resultAssembler == null) { - val resultCodegen = new GenerateBinaryResultAssembler[L, R, O]( - leftType, - rightType, - resultType, - outputFields, - getRuntimeContext.getUserCodeClassLoader) - - resultAssembler = resultCodegen.generate() - } - } - - def join(left: L, right: R, out: Collector[O]) = { - if (compiledPredicate == null) { - result = resultAssembler(left, right, result) - out.collect(result) - } else { - if (compiledPredicate(left, right)) { - result = resultAssembler(left, right, result) - out.collect(result) } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionSelectFunction.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionSelectFunction.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionSelectFunction.scala deleted file mode 100644 index 5cdd8b2..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/ExpressionSelectFunction.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.runtime - -import org.apache.flink.api.expressions.tree.Expression -import org.apache.flink.api.common.functions.RichMapFunction -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.expressions.codegen.GenerateUnaryResultAssembler -import org.apache.flink.configuration.Configuration - -class ExpressionSelectFunction[I, O]( - inputType: CompositeType[I], - resultType: CompositeType[O], - outputFields: Seq[Expression]) extends RichMapFunction[I, O] { - - var resultAssembler: (I, O) => O = null - var result: O = null.asInstanceOf[O] - - override def open(config: Configuration): Unit = { - result = resultType.createSerializer(getRuntimeContext.getExecutionConfig).createInstance() - - if (resultAssembler == null) { - val resultCodegen = new GenerateUnaryResultAssembler[I, O]( - inputType, - resultType, - outputFields, - getRuntimeContext.getUserCodeClassLoader) - - resultAssembler = resultCodegen.generate() - } - } - - def map(in: I): O = { - resultAssembler(in, result) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/package.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/package.scala deleted file mode 100644 index 0a3d683..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/runtime/package.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions - -/** - * The functions in this package are used to translate expression operations to Java API operations. - */ -package object runtime http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/Expression.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/Expression.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/Expression.scala deleted file mode 100644 index 8264705..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/Expression.scala +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.tree - -import java.util.concurrent.atomic.AtomicInteger - -import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, TypeInformation} - -import scala.language.postfixOps - - -abstract class Expression extends Product { - def children: Seq[Expression] - def name: String = Expression.freshName("expression") - def typeInfo: TypeInformation[_] - - /** - * Tests for equality by first testing for reference equality. - */ - def fastEquals(other: Expression): Boolean = this.eq(other) || this == other - - def transformPre(rule: PartialFunction[Expression, Expression]): Expression = { - val afterTransform = rule.applyOrElse(this, identity[Expression]) - - if (afterTransform fastEquals this) { - this.transformChildrenPre(rule) - } else { - afterTransform.transformChildrenPre(rule) - } - } - - def transformChildrenPre(rule: PartialFunction[Expression, Expression]): Expression = { - var changed = false - val newArgs = productIterator map { - case child: Expression if children.contains(child) => - val newChild = child.transformPre(rule) - if (newChild fastEquals child) { - child - } else { - changed = true - newChild - } - case other: AnyRef => other - case null => null - } toArray - - if (changed) makeCopy(newArgs) else this - } - - def transformPost(rule: PartialFunction[Expression, Expression]): Expression = { - val afterChildren = transformChildrenPost(rule) - if (afterChildren fastEquals this) { - rule.applyOrElse(this, identity[Expression]) - } else { - rule.applyOrElse(afterChildren, identity[Expression]) - } - } - - def transformChildrenPost(rule: PartialFunction[Expression, Expression]): Expression = { - var changed = false - val newArgs = productIterator map { - case child: Expression if children.contains(child) => - val newChild = child.transformPost(rule) - if (newChild fastEquals child) { - child - } else { - changed = true - newChild - } - case other: AnyRef => other - case null => null - } toArray - // toArray forces evaluation, toSeq does not seem to work here - - if (changed) makeCopy(newArgs) else this - } - - def exists(predicate: Expression => Boolean): Boolean = { - var exists = false - this.transformPre { - case e: Expression => if (predicate(e)) { - exists = true - } - e - } - exists - } - - /** - * Creates a new copy of this expression with new children. This is used during transformation - * if children change. This must be overridden by Expressions that don't have the Constructor - * arguments in the same order as the `children`. - */ - def makeCopy(newArgs: Seq[AnyRef]): this.type = { - val defaultCtor = - this.getClass.getConstructors.find { _.getParameterTypes.size > 0}.head - try { - defaultCtor.newInstance(newArgs.toArray: _*).asInstanceOf[this.type] - } catch { - case iae: IllegalArgumentException => - println("IAE " + this) - throw new RuntimeException("Should never happen.") - } - } -} - -abstract class BinaryExpression() extends Expression { - def left: Expression - def right: Expression - def children = Seq(left, right) -} - -abstract class UnaryExpression() extends Expression { - def child: Expression - def children = Seq(child) -} - -abstract class LeafExpression() extends Expression { - val children = Nil -} - -case class NopExpression() extends LeafExpression { - val typeInfo = new NothingTypeInfo() - override val name = Expression.freshName("nop") - -} - -object Expression { - def freshName(prefix: String): String = { - s"$prefix-${freshNameCounter.getAndIncrement}" - } - - val freshNameCounter = new AtomicInteger -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/aggregations.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/aggregations.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/aggregations.scala deleted file mode 100644 index d3b19fa..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/aggregations.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.tree - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.api.java.aggregation.Aggregations - - -abstract sealed class Aggregation extends UnaryExpression { - def typeInfo = { - child.typeInfo match { - case BasicTypeInfo.LONG_TYPE_INFO => // ok - case BasicTypeInfo.INT_TYPE_INFO => - case BasicTypeInfo.DOUBLE_TYPE_INFO => - case BasicTypeInfo.FLOAT_TYPE_INFO => - case BasicTypeInfo.BYTE_TYPE_INFO => - case BasicTypeInfo.SHORT_TYPE_INFO => - case _ => - throw new ExpressionException(s"Unsupported type ${child.typeInfo} for " + - s"aggregation $this. Only numeric data types supported.") - } - child.typeInfo - } - - override def toString = s"Aggregate($child)" - - def getIntermediateFields: Seq[Expression] - def getFinalField(inputs: Seq[Expression]): Expression - def getAggregations: Seq[Aggregations] -} - -case class Sum(child: Expression) extends Aggregation { - override def toString = s"($child).sum" - - override def getIntermediateFields: Seq[Expression] = Seq(child) - override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0) - override def getAggregations = Seq(Aggregations.SUM) -} - -case class Min(child: Expression) extends Aggregation { - override def toString = s"($child).min" - - override def getIntermediateFields: Seq[Expression] = Seq(child) - override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0) - override def getAggregations = Seq(Aggregations.MIN) - -} - -case class Max(child: Expression) extends Aggregation { - override def toString = s"($child).max" - - override def getIntermediateFields: Seq[Expression] = Seq(child) - override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0) - override def getAggregations = Seq(Aggregations.MAX) -} - -case class Count(child: Expression) extends Aggregation { - override def typeInfo = { - child.typeInfo match { - case _ => // we can count anything... :D - } - BasicTypeInfo.INT_TYPE_INFO - } - - override def toString = s"($child).count" - - override def getIntermediateFields: Seq[Expression] = Seq(Literal(Integer.valueOf(1))) - override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0) - override def getAggregations = Seq(Aggregations.SUM) - -} - -case class Avg(child: Expression) extends Aggregation { - override def toString = s"($child).avg" - - override def getIntermediateFields: Seq[Expression] = Seq(child, Literal(1)) - // This is just sweet. Use our own AST representation and let the code generator do - // our dirty work. - override def getFinalField(inputs: Seq[Expression]): Expression = - Div(inputs(0), inputs(1)) - override def getAggregations = Seq(Aggregations.SUM, Aggregations.SUM) - -}