http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala new file mode 100644 index 0000000..625fdbf --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.expressions.analysis + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.api.table.trees.Analyzer + +/** + * This analyzes selection expressions. + */ +class SelectionAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) + extends Analyzer[Expression] { + + def rules = Seq( + new ResolveFieldReferences(inputFields), + new VerifyNoNestedAggregates, + new InsertAutoCasts, + new TypeCheck) + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala new file mode 100644 index 0000000..b724561 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions.analysis + +import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.api.table.trees.Rule +import org.apache.flink.api.table.{_} + +import scala.collection.mutable + +/** + * Rule that makes sure we call [[Expression.typeInfo]] on each [[Expression]] at least once. + * Expressions are expected to perform type verification in this method. + */ +class TypeCheck extends Rule[Expression] { + + def apply(expr: Expression) = { + val errors = mutable.MutableList[String]() + + val result = expr.transformPre { + case expr: Expression=> { + // simply get the typeInfo from the expression. this will perform type analysis + try { + expr.typeInfo + } catch { + case e: ExpressionException => + errors += e.getMessage + } + expr + } + } + + if (errors.length > 0) { + throw new ExpressionException( + s"""Invalid expression "$expr": ${errors.mkString(" ")}""") + } + + result + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala new file mode 100644 index 0000000..e75dd20 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions.analysis + +import org.apache.flink.api.table.expressions.{NopExpression, Expression} +import org.apache.flink.api.table.trees.Rule +import org.apache.flink.api.table.{_} +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +import scala.collection.mutable + +/** + * [[Rule]] that verifies that the result type of an [[Expression]] is Boolean. This is required + * for filter/join predicates. + */ +class VerifyBoolean extends Rule[Expression] { + + def apply(expr: Expression) = { + if (!expr.isInstanceOf[NopExpression] && expr.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) { + throw new ExpressionException(s"Expression $expr of type ${expr.typeInfo} is not boolean.") + } + + expr + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala new file mode 100644 index 0000000..09dbf88 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions.analysis + +import org.apache.flink.api.table.ExpressionException +import org.apache.flink.api.table.expressions.{Aggregation, Expression} + +import scala.collection.mutable + +import org.apache.flink.api.table.trees.Rule + +/** + * Rule that verifies that an expression does not contain aggregate operations. Right now, join + * predicates and filter predicates cannot contain aggregates. + */ +class VerifyNoAggregates extends Rule[Expression] { + + def apply(expr: Expression) = { + val errors = mutable.MutableList[String]() + + val result = expr.transformPre { + case agg: Aggregation=> { + errors += + s"""Aggregations are not allowed in join/filter predicates.""" + agg + } + } + + if (errors.length > 0) { + throw new ExpressionException( + s"""Invalid expression "$expr": ${errors.mkString(" ")}""") + } + + result + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala new file mode 100644 index 0000000..07acf1e --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions.analysis + +import org.apache.flink.api.table.ExpressionException +import org.apache.flink.api.table.expressions.{Expression, Aggregation} + +import scala.collection.mutable + +import org.apache.flink.api.table.trees.Rule + +/** + * Rule that verifies that an expression does not contain aggregate operations + * as children of aggregate operations. + */ +class VerifyNoNestedAggregates extends Rule[Expression] { + + def apply(expr: Expression) = { + val errors = mutable.MutableList[String]() + + val result = expr.transformPre { + case agg: Aggregation=> { + if (agg.child.exists(_.isInstanceOf[Aggregation])) { + errors += s"""Found nested aggregation inside "$agg".""" + } + agg + } + } + + if (errors.length > 0) { + throw new ExpressionException( + s"""Invalid expression "$expr": ${errors.mkString(" ")}""") + } + + result + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala new file mode 100644 index 0000000..e866ea0 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.expressions + +import org.apache.flink.api.table.ExpressionException +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo, NumericTypeInfo, TypeInformation} + +abstract class BinaryArithmetic extends BinaryExpression { self: Product => + def typeInfo = { + if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { + throw new ExpressionException( + s"""Non-numeric operand ${left} of type ${left.typeInfo} in $this""") + } + if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { + throw new ExpressionException( + s"""Non-numeric operand "${right}" of type ${right.typeInfo} in $this""") + } + if (left.typeInfo != right.typeInfo) { + throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + + s"${right.typeInfo} in $this") + } + left.typeInfo + } +} + +case class Plus(left: Expression, right: Expression) extends BinaryArithmetic { + override def typeInfo = { + if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]] && + !(left.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) { + throw new ExpressionException(s"Non-numeric operand type ${left.typeInfo} in $this") + } + if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]] && + !(right.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) { + throw new ExpressionException(s"Non-numeric operand type ${right.typeInfo} in $this") + } + if (left.typeInfo != right.typeInfo) { + throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + + s"${right.typeInfo} in $this") + } + left.typeInfo + } + + override def toString = s"($left + $right)" +} + +case class UnaryMinus(child: Expression) extends UnaryExpression { + def typeInfo = { + if (!child.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { + throw new ExpressionException( + s"""Non-numeric operand ${child} of type ${child.typeInfo} in $this""") + } + child.typeInfo + } + + override def toString = s"-($child)" +} + +case class Minus(left: Expression, right: Expression) extends BinaryArithmetic { + override def toString = s"($left - $right)" +} + +case class Div(left: Expression, right: Expression) extends BinaryArithmetic { + override def toString = s"($left / $right)" +} + +case class Mul(left: Expression, right: Expression) extends BinaryArithmetic { + override def toString = s"($left * $right)" +} + +case class Mod(left: Expression, right: Expression) extends BinaryArithmetic { + override def toString = s"($left * $right)" +} + +case class Abs(child: Expression) extends UnaryExpression { + def typeInfo = child.typeInfo + + override def toString = s"abs($child)" +} + +abstract class BitwiseBinaryArithmetic extends BinaryExpression { self: Product => + def typeInfo: TypeInformation[_] = { + if (!left.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { + throw new ExpressionException( + s"""Non-integer operand ${left} of type ${left.typeInfo} in $this""") + } + if (!right.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { + throw new ExpressionException( + s"""Non-integer operand "${right}" of type ${right.typeInfo} in $this""") + } + if (left.typeInfo != right.typeInfo) { + throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + + s"${right.typeInfo} in $this") + } + if (left.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) { + left.typeInfo + } else { + BasicTypeInfo.INT_TYPE_INFO + } + } +} + +case class BitwiseAnd(left: Expression, right: Expression) extends BitwiseBinaryArithmetic { + override def toString = s"($left & $right)" +} + +case class BitwiseOr(left: Expression, right: Expression) extends BitwiseBinaryArithmetic { + override def toString = s"($left | $right)" +} + + +case class BitwiseXor(left: Expression, right: Expression) extends BitwiseBinaryArithmetic { + override def toString = s"($left ^ $right)" +} + +case class BitwiseNot(child: Expression) extends UnaryExpression { + def typeInfo: TypeInformation[_] = { + if (!child.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { + throw new ExpressionException( + s"""Non-integer operand ${child} of type ${child.typeInfo} in $this""") + } + if (child.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) { + child.typeInfo + } else { + BasicTypeInfo.INT_TYPE_INFO + } + } + + override def toString = s"~($child)" +} + http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala new file mode 100644 index 0000000..9fae862 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.expressions + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.table.ExpressionException + +case class Cast(child: Expression, tpe: TypeInformation[_]) extends UnaryExpression { + def typeInfo = tpe match { + case BasicTypeInfo.STRING_TYPE_INFO => tpe + + case b if b.isBasicType && child.typeInfo.isBasicType => tpe + + case _ => throw new ExpressionException( + s"Invalid cast: $this. Casts are only valid betwixt primitive types.") + } + + override def toString = s"$child.cast($tpe)" +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala new file mode 100644 index 0000000..687ea7a --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.expressions + +import org.apache.flink.api.table.ExpressionException +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo} + +abstract class BinaryComparison extends BinaryExpression { self: Product => + def typeInfo = { + if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { + throw new ExpressionException(s"Non-numeric operand ${left} in $this") + } + if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { + throw new ExpressionException(s"Non-numeric operand ${right} in $this") + } + if (left.typeInfo != right.typeInfo) { + throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + + s"${right.typeInfo} in $this") + } + BasicTypeInfo.BOOLEAN_TYPE_INFO + } +} + +case class EqualTo(left: Expression, right: Expression) extends BinaryComparison { + override def typeInfo = { + if (left.typeInfo != right.typeInfo) { + throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + + s"${right.typeInfo} in $this") + } + BasicTypeInfo.BOOLEAN_TYPE_INFO + } + + override def toString = s"$left === $right" +} + +case class NotEqualTo(left: Expression, right: Expression) extends BinaryComparison { + override def typeInfo = { + if (left.typeInfo != right.typeInfo) { + throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + + s"${right.typeInfo} in $this") + } + BasicTypeInfo.BOOLEAN_TYPE_INFO + } + + override def toString = s"$left !== $right" +} + +case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison { + override def toString = s"$left > $right" +} + +case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison { + override def toString = s"$left >= $right" +} + +case class LessThan(left: Expression, right: Expression) extends BinaryComparison { + override def toString = s"$left < $right" +} + +case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison { + override def toString = s"$left <= $right" +} + +case class IsNull(child: Expression) extends UnaryExpression { + def typeInfo = { + BasicTypeInfo.BOOLEAN_TYPE_INFO + } + + override def toString = s"($child).isNull" +} + +case class IsNotNull(child: Expression) extends UnaryExpression { + def typeInfo = { + BasicTypeInfo.BOOLEAN_TYPE_INFO + } + + override def toString = s"($child).isNotNull" +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala new file mode 100644 index 0000000..a649aed --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.expressions + +import org.apache.flink.api.table.ExpressionException +import org.apache.flink.api.common.typeinfo.TypeInformation + +case class UnresolvedFieldReference(override val name: String) extends LeafExpression { + def typeInfo = throw new ExpressionException(s"Unresolved field reference: $this") + + override def toString = "\"" + name +} + +case class ResolvedFieldReference( + override val name: String, + tpe: TypeInformation[_]) extends LeafExpression { + def typeInfo = tpe + + override def toString = s"'$name" +} + +case class Naming(child: Expression, override val name: String) extends UnaryExpression { + def typeInfo = child.typeInfo + + override def toString = s"$child as '$name" +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala new file mode 100644 index 0000000..f909cab --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.expressions + +import java.util.Date +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.scala.table.ImplicitExpressionOperations + +object Literal { + def apply(l: Any): Literal = l match { + case i:Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO) + case l:Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO) + case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO) + case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO) + case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO) + case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO) + case date: Date => Literal(date, BasicTypeInfo.DATE_TYPE_INFO) + } +} + +case class Literal(value: Any, tpe: TypeInformation[_]) + extends LeafExpression with ImplicitExpressionOperations { + def expr = this + def typeInfo = tpe + + override def toString = s"$value" +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala new file mode 100644 index 0000000..eaf0463 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.expressions + +import org.apache.flink.api.table.ExpressionException +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +abstract class BinaryPredicate extends BinaryExpression { self: Product => + def typeInfo = { + if (left.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO || + right.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) { + throw new ExpressionException(s"Non-boolean operand types ${left.typeInfo} and " + + s"${right.typeInfo} in $this") + } + BasicTypeInfo.BOOLEAN_TYPE_INFO + } +} + +case class Not(child: Expression) extends UnaryExpression { + def typeInfo = { + if (child.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) { + throw new ExpressionException(s"Non-boolean operand type ${child.typeInfo} in $this") + } + BasicTypeInfo.BOOLEAN_TYPE_INFO + } + + override val name = Expression.freshName("not-" + child.name) + + override def toString = s"!($child)" +} + +case class And(left: Expression, right: Expression) extends BinaryPredicate { + override def toString = s"$left && $right" + + override val name = Expression.freshName(left.name + "-and-" + right.name) +} + +case class Or(left: Expression, right: Expression) extends BinaryPredicate { + override def toString = s"$left || $right" + + override val name = Expression.freshName(left.name + "-or-" + right.name) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala new file mode 100644 index 0000000..c5c8c94 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +/** + * This package contains the base class of AST nodes and all the expression language AST classes. + * Expression trees should not be manually constructed by users. They are implicitly constructed + * from the implicit DSL conversions in + * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and + * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]]. For the Java API, + * expression trees should be generated from a string parser that parses expressions and creates + * AST nodes. + */ +package object expressions http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala new file mode 100644 index 0000000..a39d601 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.expressions + +import org.apache.flink.api.table.ExpressionException +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo} + +case class Substring( + str: Expression, + beginIndex: Expression, + endIndex: Expression) extends Expression { + def typeInfo = { + if (str.typeInfo != BasicTypeInfo.STRING_TYPE_INFO) { + throw new ExpressionException( + s"""Operand must be of type String in $this, is ${str.typeInfo}.""") + } + if (!beginIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { + throw new ExpressionException( + s"""Begin index must be an integer type in $this, is ${beginIndex.typeInfo}.""") + } + if (!endIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { + throw new ExpressionException( + s"""End index must be an integer type in $this, is ${endIndex.typeInfo}.""") + } + + BasicTypeInfo.STRING_TYPE_INFO + } + + override def children: Seq[Expression] = Seq(str, beginIndex, endIndex) + override def toString = s"($str).substring($beginIndex, $endIndex)" +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/package.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/package.scala new file mode 100644 index 0000000..bdcb22c --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/package.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api + +/** + * == Table API == + * + * This package contains the generic part of the Table API. It can be used with Flink Streaming + * and Flink Batch. From Scala as well as from Java. + * + * When using the Table API, as user creates a [[org.apache.flink.api.table.Table]] from + * a DataSet or DataStream. On this relational operations can be performed. A table can also + * be converted back to a DataSet or DataStream. + * + * Packages [[org.apache.flink.api.scala.table]] and [[org.apache.flink.api.java.table]] contain + * the language specific part of the API. Refer to these packages for documentation on how + * the Table API can be used in Java and Scala. + */ +package object table http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala new file mode 100644 index 0000000..2cbd8fa --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.parser + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.table.ExpressionException +import org.apache.flink.api.table.plan.As +import org.apache.flink.api.table.expressions._ + +import scala.util.parsing.combinator.{PackratParsers, JavaTokenParsers} + +/** + * Parser for expressions inside a String. This parses exactly the same expressions that + * would be accepted by the Scala Expression DSL. + * + * See [[org.apache.flink.api.scala.table.ImplicitExpressionConversions]] and + * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]] for the constructs + * available in the Scala Expression DSL. This parser must be kept in sync with the Scala DSL + * lazy valined in the above files. + */ +object ExpressionParser extends JavaTokenParsers with PackratParsers { + case class Keyword(key: String) + + // Convert the keyword into an case insensitive Parser + implicit def keyword2Parser(kw: Keyword): Parser[String] = { + ("""(?i)\Q""" + kw.key + """\E""").r + } + + // KeyWord + + lazy val AS: Keyword = Keyword("as") + lazy val COUNT: Keyword = Keyword("count") + lazy val AVG: Keyword = Keyword("avg") + lazy val MIN: Keyword = Keyword("min") + lazy val MAX: Keyword = Keyword("max") + lazy val SUM: Keyword = Keyword("sum") + + // Literals + + lazy val numberLiteral: PackratParser[Expression] = + ((wholeNumber <~ ("L" | "l")) | floatingPointNumber | decimalNumber | wholeNumber) ^^ { + str => + if (str.endsWith("L") || str.endsWith("l")) { + Literal(str.toLong) + } else if (str.matches("""-?\d+""")) { + Literal(str.toInt) + } else if (str.endsWith("f") | str.endsWith("F")) { + Literal(str.toFloat) + } else { + Literal(str.toDouble) + } + } + + lazy val singleQuoteStringLiteral: Parser[Expression] = + ("'" + """([^'\p{Cntrl}\\]|\\[\\'"bfnrt]|\\u[a-fA-F0-9]{4})*""" + "'").r ^^ { + str => Literal(str.substring(1, str.length - 1)) + } + + lazy val stringLiteralFlink: PackratParser[Expression] = super.stringLiteral ^^ { + str => Literal(str.substring(1, str.length - 1)) + } + + lazy val boolLiteral: PackratParser[Expression] = ("true" | "false") ^^ { + str => Literal(str.toBoolean) + } + + lazy val literalExpr: PackratParser[Expression] = + numberLiteral | + stringLiteralFlink | singleQuoteStringLiteral | + boolLiteral + + lazy val fieldReference: PackratParser[Expression] = ident ^^ { + case sym => UnresolvedFieldReference(sym) + } + + lazy val atom: PackratParser[Expression] = + ( "(" ~> expression <~ ")" ) | literalExpr | fieldReference + + // suffix ops + lazy val isNull: PackratParser[Expression] = atom <~ ".isNull" ^^ { e => IsNull(e) } + lazy val isNotNull: PackratParser[Expression] = atom <~ ".isNotNull" ^^ { e => IsNotNull(e) } + + lazy val abs: PackratParser[Expression] = atom <~ ".abs" ^^ { e => Abs(e) } + + lazy val sum: PackratParser[Expression] = + (atom <~ ".sum" ^^ { e => Sum(e) }) | (SUM ~ "(" ~> atom <~ ")" ^^ { e => Sum(e) }) + lazy val min: PackratParser[Expression] = + (atom <~ ".min" ^^ { e => Min(e) }) | (MIN ~ "(" ~> atom <~ ")" ^^ { e => Min(e) }) + lazy val max: PackratParser[Expression] = + (atom <~ ".max" ^^ { e => Max(e) }) | (MAX ~ "(" ~> atom <~ ")" ^^ { e => Max(e) }) + lazy val count: PackratParser[Expression] = + (atom <~ ".count" ^^ { e => Count(e) }) | (COUNT ~ "(" ~> atom <~ ")" ^^ { e => Count(e) }) + lazy val avg: PackratParser[Expression] = + (atom <~ ".avg" ^^ { e => Avg(e) }) | (AVG ~ "(" ~> atom <~ ")" ^^ { e => Avg(e) }) + + lazy val cast: PackratParser[Expression] = + atom <~ ".cast(BYTE)" ^^ { e => Cast(e, BasicTypeInfo.BYTE_TYPE_INFO) } | + atom <~ ".cast(SHORT)" ^^ { e => Cast(e, BasicTypeInfo.SHORT_TYPE_INFO) } | + atom <~ ".cast(INT)" ^^ { e => Cast(e, BasicTypeInfo.INT_TYPE_INFO) } | + atom <~ ".cast(LONG)" ^^ { e => Cast(e, BasicTypeInfo.LONG_TYPE_INFO) } | + atom <~ ".cast(FLOAT)" ^^ { e => Cast(e, BasicTypeInfo.FLOAT_TYPE_INFO) } | + atom <~ ".cast(DOUBLE)" ^^ { e => Cast(e, BasicTypeInfo.DOUBLE_TYPE_INFO) } | + atom <~ ".cast(BOOL)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } | + atom <~ ".cast(BOOLEAN)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } | + atom <~ ".cast(STRING)" ^^ { e => Cast(e, BasicTypeInfo.STRING_TYPE_INFO) } | + atom <~ ".cast(DATE)" ^^ { e => Cast(e, BasicTypeInfo.DATE_TYPE_INFO) } + + lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ ")" ^^ { + case e ~ _ ~ as ~ _ => Naming(e, as.name) + } + + lazy val substring: PackratParser[Expression] = + atom ~ ".substring(" ~ expression ~ "," ~ expression ~ ")" ^^ { + case e ~ _ ~ from ~ _ ~ to ~ _ => Substring(e, from, to) + + } + + lazy val substringWithoutEnd: PackratParser[Expression] = + atom ~ ".substring(" ~ expression ~ ")" ^^ { + case e ~ _ ~ from ~ _ => Substring(e, from, Literal(Integer.MAX_VALUE)) + + } + + lazy val suffix = + isNull | isNotNull | + abs | sum | min | max | count | avg | cast | + substring | substringWithoutEnd | atom + + + // unary ops + + lazy val unaryNot: PackratParser[Expression] = "!" ~> suffix ^^ { e => Not(e) } + + lazy val unaryMinus: PackratParser[Expression] = "-" ~> suffix ^^ { e => UnaryMinus(e) } + + lazy val unaryBitwiseNot: PackratParser[Expression] = "~" ~> suffix ^^ { e => BitwiseNot(e) } + + lazy val unary = unaryNot | unaryMinus | unaryBitwiseNot | suffix + + // binary bitwise opts + + lazy val binaryBitwise = unary * ( + "&" ^^^ { (a:Expression, b:Expression) => BitwiseAnd(a,b) } | + "|" ^^^ { (a:Expression, b:Expression) => BitwiseOr(a,b) } | + "^" ^^^ { (a:Expression, b:Expression) => BitwiseXor(a,b) } ) + + // arithmetic + + lazy val product = binaryBitwise * ( + "*" ^^^ { (a:Expression, b:Expression) => Mul(a,b) } | + "/" ^^^ { (a:Expression, b:Expression) => Div(a,b) } | + "%" ^^^ { (a:Expression, b:Expression) => Mod(a,b) } ) + + lazy val term = product * ( + "+" ^^^ { (a:Expression, b:Expression) => Plus(a,b) } | + "-" ^^^ { (a:Expression, b:Expression) => Minus(a,b) } ) + + // Comparison + + lazy val equalTo: PackratParser[Expression] = term ~ ("===" | "=") ~ term ^^ { + case l ~ _ ~ r => EqualTo(l, r) + } + + lazy val notEqualTo: PackratParser[Expression] = term ~ ("!==" | "!=" | "<>") ~ term ^^ { + case l ~ _ ~ r => NotEqualTo(l, r) + } + + lazy val greaterThan: PackratParser[Expression] = term ~ ">" ~ term ^^ { + case l ~ _ ~ r => GreaterThan(l, r) + } + + lazy val greaterThanOrEqual: PackratParser[Expression] = term ~ ">=" ~ term ^^ { + case l ~ _ ~ r => GreaterThanOrEqual(l, r) + } + + lazy val lessThan: PackratParser[Expression] = term ~ "<" ~ term ^^ { + case l ~ _ ~ r => LessThan(l, r) + } + + lazy val lessThanOrEqual: PackratParser[Expression] = term ~ "<=" ~ term ^^ { + case l ~ _ ~ r => LessThanOrEqual(l, r) + } + + lazy val comparison: PackratParser[Expression] = + equalTo | notEqualTo | + greaterThan | greaterThanOrEqual | + lessThan | lessThanOrEqual | term + + // logic + + lazy val logic = comparison * ( + "&&" ^^^ { (a:Expression, b:Expression) => And(a,b) } | + "||" ^^^ { (a:Expression, b:Expression) => Or(a,b) } ) + + // alias + + lazy val alias: PackratParser[Expression] = logic ~ AS ~ fieldReference ^^ { + case e ~ _ ~ name => Naming(e, name.name) + } | logic + + lazy val expression: PackratParser[Expression] = alias + + lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",") + + def parseExpressionList(expression: String): List[Expression] = { + parseAll(expressionList, expression) match { + case Success(lst, _) => lst + + case Failure(msg, _) => throw new ExpressionException("Could not parse expression: " + msg) + + case Error(msg, _) => throw new ExpressionException("Could not parse expression: " + msg) + } + } + + def parseExpression(exprString: String): Expression = { + parseAll(expression, exprString) match { + case Success(lst, _) => lst + + case fail => + throw new ExpressionException("Could not parse expression: " + fail.toString) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala new file mode 100644 index 0000000..2e09f39 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan + +import org.apache.flink.api.table.expressions.analysis.SelectionAnalyzer +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.java.aggregation.Aggregations + +import scala.collection.mutable + +/** + * This is used to expand a [[Select]] that contains aggregations. If it is called on a [[Select]] + * without aggregations it is simply returned. + * + * This select: + * {{{ + * in.select('key, 'value.avg) + * }}} + * + * is transformed to this expansion: + * {{{ + * in + * .select('key, 'value, Literal(1) as 'intermediate.1) + * .aggregate('value.sum, 'intermediate.1.sum) + * .select('key, 'value / 'intermediate.1) + * }}} + * + * If the input of the [[Select]] is a [[GroupBy]] this is preserved before the aggregation. + */ +object ExpandAggregations { + def apply(select: Select): PlanNode = select match { + case Select(input, selection) => + + val aggregations = mutable.HashMap[(Expression, Aggregations), String]() + val intermediateFields = mutable.HashSet[Expression]() + val aggregationIntermediates = mutable.HashMap[Aggregation, Seq[Expression]]() + + var intermediateCount = 0 + var resultCount = 0 + selection foreach { f => + f.transformPre { + case agg: Aggregation => + val intermediateReferences = agg.getIntermediateFields.zip(agg.getAggregations) map { + case (expr, basicAgg) => + resultCount += 1 + val resultName = s"result.$resultCount" + aggregations.get((expr, basicAgg)) match { + case Some(intermediateName) => + Naming(ResolvedFieldReference(intermediateName, expr.typeInfo), resultName) + case None => + intermediateCount = intermediateCount + 1 + val intermediateName = s"intermediate.$intermediateCount" + intermediateFields += Naming(expr, intermediateName) + aggregations((expr, basicAgg)) = intermediateName + Naming(ResolvedFieldReference(intermediateName, expr.typeInfo), resultName) + } + } + + aggregationIntermediates(agg) = intermediateReferences + // Return a NOP so that we don't add the children of the aggregation + // to intermediate fields. We already added the necessary fields to the list + // of intermediate fields. + NopExpression() + + case fa: ResolvedFieldReference => + if (!fa.name.startsWith("intermediate")) { + intermediateFields += Naming(fa, fa.name) + } + fa + } + } + + if (aggregations.isEmpty) { + // no aggregations, just return + return select + } + + // also add the grouping keys to the set of intermediate fields, because we use a Set, + // they are only added when not already present + input match { + case GroupBy(_, groupingFields) => + groupingFields foreach { + case fa: ResolvedFieldReference => + intermediateFields += Naming(fa, fa.name) + } + case _ => // Nothing to add + } + + val basicAggregations = aggregations.map { + case ((expr, basicAgg), fieldName) => + (fieldName, basicAgg) + } + + val finalFields = selection.map { f => + f.transformPre { + case agg: Aggregation => + val intermediates = aggregationIntermediates(agg) + agg.getFinalField(intermediates) + } + } + + val intermediateAnalyzer = new SelectionAnalyzer(input.outputFields) + val analyzedIntermediates = intermediateFields.toSeq.map(intermediateAnalyzer.analyze) + + val finalAnalyzer = + new SelectionAnalyzer(analyzedIntermediates.map(e => (e.name, e.typeInfo))) + val analyzedFinals = finalFields.map(finalAnalyzer.analyze) + + val result = input match { + case GroupBy(groupByInput, groupingFields) => + Select( + Aggregate( + GroupBy( + Select(groupByInput, analyzedIntermediates), + groupingFields), + basicAggregations.toSeq), + analyzedFinals) + + case _ => + Select( + Aggregate( + Select(input, analyzedIntermediates), + basicAggregations.toSeq), + analyzedFinals) + + } + + result + + case _ => select + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala new file mode 100644 index 0000000..ba8aba4 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan + +import java.lang.reflect.Modifier + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.parser.ExpressionParser +import org.apache.flink.api.table.expressions.{Expression, Naming, ResolvedFieldReference, UnresolvedFieldReference} +import org.apache.flink.api.table.typeinfo.RowTypeInfo +import org.apache.flink.api.table.{ExpressionException, Table} + +import scala.language.reflectiveCalls + +/** + * Base class for translators that transform the logical plan in a [[Table]] to an executable + * Flink plan and also for creating a [[Table]] from a DataSet or DataStream. + */ +abstract class PlanTranslator { + + type Representation[A] <: { def getType(): TypeInformation[A] } + + /** + * Translates the given Table API [[PlanNode]] back to the underlying representation, i.e, + * a DataSet or a DataStream. + */ + def translate[A](op: PlanNode)(implicit tpe: TypeInformation[A]): Representation[A] + + /** + * Creates a [[Table]] from a DataSet or a DataStream (the underlying representation). + */ + def createTable[A]( + repr: Representation[A], + inputType: CompositeType[A], + expressions: Array[Expression], + resultFields: Seq[(String, TypeInformation[_])]): Table + + /** + * Creates a [[Table]] from the given DataSet or DataStream. + */ + def createTable[A](repr: Representation[A]): Table = { + + val fields = repr.getType() match { + case c: CompositeType[A] => c.getFieldNames.map(UnresolvedFieldReference) + + case tpe => Array() // createTable will throw an exception for this later + } + createTable( + repr, + fields.toArray.asInstanceOf[Array[Expression]], + checkDeterministicFields = false) + } + + /** + * Creates a [[Table]] from the given DataSet or DataStream while only taking those + * fields mentioned in the field expression. + */ + def createTable[A](repr: Representation[A], expression: String): Table = { + + val fields = ExpressionParser.parseExpressionList(expression) + + createTable(repr, fields.toArray, checkDeterministicFields = true) + } + + /** + * Creates a [[Table]] from the given DataSet or DataStream while only taking those + * fields mentioned in the fields parameter. + * + * When checkDeterministicFields is true check whether the fields of the underlying + * [[TypeInformation]] have a deterministic ordering. This is only the case for Tuples + * and Case classes. For a POJO, the field order is not obvious, this can lead to problems + * when a user renames fields and assumes a certain ordering. + */ + def createTable[A]( + repr: Representation[A], + fields: Array[Expression], + checkDeterministicFields: Boolean = true): Table = { + + // shortcut for DataSet[Row] or DataStream[Row] + repr.getType() match { + case rowTypeInfo: RowTypeInfo => + val expressions = rowTypeInfo.getFieldNames map { + name => (name, rowTypeInfo.getTypeAt(name)) + } + new Table( + Root(repr, expressions)) + + case c: CompositeType[A] => // us ok + + case tpe => throw new ExpressionException("Only DataSets or DataStreams of composite type" + + "can be transformed to a Table. These would be tuples, case classes and " + + "POJOs. Type is: " + tpe) + + } + + val clazz = repr.getType().getTypeClass + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) + || clazz.getCanonicalName() == null) { + throw new ExpressionException("Cannot create Table from DataSet or DataStream of type " + + clazz.getName + ". Only top-level classes or static members classes " + + " are supported.") + } + + val inputType = repr.getType().asInstanceOf[CompositeType[A]] + + if (!inputType.hasDeterministicFieldOrder && checkDeterministicFields) { + throw new ExpressionException(s"You cannot rename fields upon Table creation: " + + s"Field order of input type $inputType is not deterministic." ) + } + + if (fields.length != inputType.getFieldNames.length) { + throw new ExpressionException("Number of selected fields: '" + fields.mkString(",") + + "' and number of fields in input type " + inputType + " do not match.") + } + + val newFieldNames = fields map { + case UnresolvedFieldReference(name) => name + case e => + throw new ExpressionException("Only field references allowed in 'as' operation, " + + " offending expression: " + e) + } + + if (newFieldNames.toSet.size != newFieldNames.size) { + throw new ExpressionException(s"Ambiguous field names in ${fields.mkString(", ")}") + } + + val resultFields: Seq[(String, TypeInformation[_])] = newFieldNames.zipWithIndex map { + case (name, index) => (name, inputType.getTypeAt(index)) + } + + val inputFields = inputType.getFieldNames + val fieldMappings = inputFields.zip(resultFields) + val expressions: Array[Expression] = fieldMappings map { + case (oldName, (newName, tpe)) => Naming(ResolvedFieldReference(oldName, tpe), newName) + } + + createTable(repr, inputType, expressions, resultFields) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala new file mode 100644 index 0000000..7ec34d7 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.aggregation.Aggregations +import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.api.table.trees.TreeNode + +/** + * Base class for all Table API operations. + */ +sealed abstract class PlanNode extends TreeNode[PlanNode] { self: Product => + def outputFields: Seq[(String, TypeInformation[_])] +} + +/** + * Operation that transforms a [[org.apache.flink.api.scala.DataSet]] or + * [[org.apache.flink.streaming.api.scala.DataStream]] into a [[org.apache.flink.api.table.Table]]. + */ +case class Root[T](input: T, outputFields: Seq[(String, TypeInformation[_])]) extends PlanNode { + val children = Nil + override def toString = s"Root($outputFields)" +} + +/** + * Operation that joins two [[org.apache.flink.api.table.Table]]s. A "filter" and a "select" + * should be applied after a join operation. + */ +case class Join(left: PlanNode, right: PlanNode) extends PlanNode { + + val children = Seq(left, right) + + def outputFields = left.outputFields ++ right.outputFields + + override def toString = s"Join($left, $right)" +} + +/** + * Operation that filters out elements that do not match the predicate expression. + */ +case class Filter(input: PlanNode, predicate: Expression) extends PlanNode { + + val children = Seq(input) + + def outputFields = input.outputFields + + override def toString = s"Filter($input, $predicate)" +} + +/** + * Selection expression. Similar to an SQL SELECT statement. The expressions can select fields + * and perform arithmetic or logic operations. The expressions can also perform aggregates + * on fields. + */ +case class Select(input: PlanNode, selection: Seq[Expression]) extends PlanNode { + + val children = Seq(input) + + def outputFields = selection.toSeq map { e => (e.name, e.typeInfo) } + + override def toString = s"Select($input, ${selection.mkString(",")})" +} + +/** + * Operation that gives new names to fields. Use this to disambiguate fields before a join + * operation. + */ +case class As(input: PlanNode, names: Seq[String]) extends PlanNode { + + val children = Seq(input) + + val outputFields = input.outputFields.zip(names) map { + case ((_, tpe), newName) => (newName, tpe) + } + + override def toString = s"As($input, ${names.mkString(",")})" +} + +/** + * Grouping operation. Keys are specified using field references. A group by operation os only + * useful when performing a select with aggregates afterwards. + * @param input + * @param fields + */ +case class GroupBy(input: PlanNode, fields: Seq[Expression]) extends PlanNode { + + val children = Seq(input) + + def outputFields = input.outputFields + + override def toString = s"GroupBy($input, ${fields.mkString(",")})" +} + +/** + * Internal operation. Selection operations containing aggregates are expanded to an [[Aggregate]] + * and a simple [[Select]]. + */ +case class Aggregate( + input: PlanNode, + aggregations: Seq[(String, Aggregations)]) extends PlanNode { + + val children = Seq(input) + + def outputFields = input.outputFields + + override def toString = s"Aggregate($input, ${aggregations.mkString(",")})" +} + +/** + * UnionAll operation, union all elements from left and right. + */ +case class UnionAll(left: PlanNode, right: PlanNode) extends PlanNode{ + val children = Seq(left, right) + + def outputFields = left.outputFields + + override def toString = s"Union($left, $right)" +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala new file mode 100644 index 0000000..a598483 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +/** + * The operations in this package are created by calling methods on [[Table]] they + * should not be manually created by users of the API. + */ +package object plan http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala new file mode 100644 index 0000000..932f9df --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.table.Row +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable +import org.apache.flink.api.java.aggregation.AggregationFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector + +@Combinable +class ExpressionAggregateFunction( + private val fieldPositions: Seq[Int], + private val functions: Seq[AggregationFunction[Any]]) + extends RichGroupReduceFunction[Row, Row] { + + override def open(conf: Configuration): Unit = { + var i = 0 + val len = functions.length + while (i < len) { + functions(i).initializeAggregate() + i += 1 + } + } + + override def reduce(in: java.lang.Iterable[Row], out: Collector[Row]): Unit = { + + val fieldPositions = this.fieldPositions + val functions = this.functions + + var current: Row = null + + val values = in.iterator() + while (values.hasNext) { + current = values.next() + + var i = 0 + val len = functions.length + while (i < len) { + functions(i).aggregate(current.productElement(fieldPositions(i))) + i += 1 + } + } + + var i = 0 + val len = functions.length + while (i < len) { + current.setField(fieldPositions(i), functions(i).getAggregate) + functions(i).initializeAggregate() + i += 1 + } + + out.collect(current) + } + +} + +@Combinable +class NoExpressionAggregateFunction() extends RichGroupReduceFunction[Row, Row] { + + override def reduce(in: java.lang.Iterable[Row], out: Collector[Row]): Unit = { + + var first: Row = null + + val values = in.iterator() + if (values.hasNext) { + first = values.next() + } + + out.collect(first) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala new file mode 100644 index 0000000..4e50272 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.GenerateFilter +import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.configuration.Configuration + +/** + * Proxy function that takes an expression predicate. This is compiled + * upon runtime and calls to [[filter()]] are forwarded to the compiled code. + */ +class ExpressionFilterFunction[T]( + predicate: Expression, + inputType: CompositeType[T], + config: TableConfig = TableConfig.DEFAULT) extends RichFilterFunction[T] { + + var compiledFilter: FilterFunction[T] = null + + override def open(c: Configuration): Unit = { + if (compiledFilter == null) { + val codegen = new GenerateFilter[T]( + inputType, + predicate, + getRuntimeContext.getUserCodeClassLoader, + config) + compiledFilter = codegen.generate() + } + } + + override def filter(in: T) = compiledFilter.filter(in) +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala new file mode 100644 index 0000000..cf2c90f --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatJoinFunction} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.GenerateJoin +import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector + +/** + * Proxy function that takes an expression predicate and output fields. These are compiled + * upon runtime and calls to [[join()]] are forwarded to the compiled code. + */ +class ExpressionJoinFunction[L, R, O]( + predicate: Expression, + leftType: CompositeType[L], + rightType: CompositeType[R], + resultType: CompositeType[O], + outputFields: Seq[Expression], + config: TableConfig = TableConfig.DEFAULT) extends RichFlatJoinFunction[L, R, O] { + + var compiledJoin: FlatJoinFunction[L, R, O] = null + + override def open(c: Configuration): Unit = { + val codegen = new GenerateJoin[L, R, O]( + leftType, + rightType, + resultType, + predicate, + outputFields, + getRuntimeContext.getUserCodeClassLoader, + config) + compiledJoin = codegen.generate() + } + + def join(left: L, right: R, out: Collector[O]) = { + compiledJoin.join(left, right, out) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala new file mode 100644 index 0000000..ab7adb1 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.codegen.GenerateSelect +import org.apache.flink.configuration.Configuration + +/** + * Proxy function that takes expressions. These are compiled + * upon runtime and calls to [[map()]] are forwarded to the compiled code. + */ +class ExpressionSelectFunction[I, O]( + inputType: CompositeType[I], + resultType: CompositeType[O], + outputFields: Seq[Expression], + config: TableConfig = TableConfig.DEFAULT) extends RichMapFunction[I, O] { + + var compiledSelect: MapFunction[I, O] = null + + override def open(c: Configuration): Unit = { + + if (compiledSelect == null) { + val resultCodegen = new GenerateSelect[I, O]( + inputType, + resultType, + outputFields, + getRuntimeContext.getUserCodeClassLoader, + config) + + compiledSelect = resultCodegen.generate() + } + } + + def map(in: I): O = { + compiledSelect.map(in) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala new file mode 100644 index 0000000..a1bc4b7 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +/** + * The functions in this package are used transforming Table API operations to Java API operations. + */ +package object runtime http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala new file mode 100644 index 0000000..87051cf --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.trees + +/** + * Base class for tree analyzers/transformers. Analyzers must implement method `rules` to + * provide the chain of rules that are invoked one after another. The tree resulting + * from one rule is fed into the next rule and the final result is returned from method `analyze`. + */ +abstract class Analyzer[A <: TreeNode[A]] { + + def rules: Seq[Rule[A]] + + final def analyze(expr: A): A = { + var currentTree = expr + for (rule <- rules) { + var running = true + while (running) { + val newTree = rule(currentTree) + if (newTree fastEquals currentTree) { + running = false + } + currentTree = newTree + } + } + currentTree + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala new file mode 100644 index 0000000..b8a27cb --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.trees + +/** + * Base class for a rule that is part of an [[Analyzer]] rule chain. Method `rule` gets a tree + * and must return a tree. The returned tree can also be the input tree. In an [[Analyzer]] + * rule chain the result tree of one [[Rule]] is fed into the next [[Rule]] in the chain. + * + * A [[Rule]] is repeatedly applied to a tree until the tree does not change between + * rule applications. + */ +abstract class Rule[A <: TreeNode[A]] { + def apply(expr: A): A +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala new file mode 100644 index 0000000..84f1d7e --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.trees + +/** + * Generic base class for trees that can be transformed and traversed. + */ +abstract class TreeNode[A <: TreeNode[A]] { self: A with Product => + + /** + * List of child nodes that should be considered when doing transformations. Other values + * in the Product will not be transformed, only handed through. + */ + def children: Seq[A] + + /** + * Tests for equality by first testing for reference equality. + */ + def fastEquals(other: TreeNode[_]): Boolean = this.eq(other) || this == other + + def transformPre(rule: PartialFunction[A, A]): A = { + val afterTransform = rule.applyOrElse(this, identity[A]) + + if (afterTransform fastEquals this) { + this.transformChildrenPre(rule) + } else { + afterTransform.transformChildrenPre(rule) + } + } + + def transformChildrenPre(rule: PartialFunction[A, A]): A = { + var changed = false + val newArgs = productIterator map { + case child: A if children.contains(child) => + val newChild = child.transformPre(rule) + if (newChild fastEquals child) { + child + } else { + changed = true + newChild + } + case other: AnyRef => other + case null => null + } toArray + + if (changed) makeCopy(newArgs) else this + } + + def transformPost(rule: PartialFunction[A, A]): A = { + val afterChildren = transformChildrenPost(rule) + if (afterChildren fastEquals this) { + rule.applyOrElse(this, identity[A]) + } else { + rule.applyOrElse(afterChildren, identity[A]) + } + } + + def transformChildrenPost(rule: PartialFunction[A, A]): A = { + var changed = false + val newArgs = productIterator map { + case child: A if children.contains(child) => + val newChild = child.transformPost(rule) + if (newChild fastEquals child) { + child + } else { + changed = true + newChild + } + case other: AnyRef => other + case null => null + } toArray + // toArray forces evaluation, toSeq does not seem to work here + + if (changed) makeCopy(newArgs) else this + } + + def exists(predicate: A => Boolean): Boolean = { + var exists = false + this.transformPre { + case e: A => if (predicate(e)) { + exists = true + } + e + } + exists + } + + /** + * Creates a new copy of this expression with new children. This is used during transformation + * if children change. This must be overridden by tree nodes that don't have the Constructor + * arguments in the same order as the `children`. + */ + def makeCopy(newArgs: Seq[AnyRef]): this.type = { + val defaultCtor = + this.getClass.getConstructors.find { _.getParameterTypes.size > 0}.head + try { + defaultCtor.newInstance(newArgs.toArray: _*).asInstanceOf[this.type] + } catch { + case iae: IllegalArgumentException => + println("IAE " + this) + throw new RuntimeException("Should never happen.") + } + } +} +