http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala new file mode 100644 index 0000000..8b72cb6 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.table + +import org.apache.flink.api.table._ +import org.apache.flink.api.table.tree.{UnresolvedFieldReference, Expression} +import org.apache.flink.api.common.typeutils.CompositeType + +import org.apache.flink.api.scala._ + +/** + * Methods for converting a [[DataSet]] to a [[Table]]. A [[DataSet]] is + * wrapped in this by the implicit conversions in [[org.apache.flink.api.scala.table]]. + */ +class DataSetConversions[T](set: DataSet[T], inputType: CompositeType[T]) { + + /** + * Converts the [[DataSet]] to a [[Table]]. The field names can be specified like this: + * + * {{{ + * val in: DataSet[(String, Int)] = ... + * val table = in.as('a, 'b) + * }}} + * + * This results in a [[Table]] that has field `a` of type `String` and field `b` + * of type `Int`. + */ + def as(fields: Expression*): Table[ScalaBatchTranslator] = { + new ScalaBatchTranslator().createTable(set, fields.toArray) + } + + /** + * Converts the [[DataSet]] to a [[Table]]. The field names will be taken from the field names + * of the input type. + * + * Example: + * + * {{{ + * val in: DataSet[(String, Int)] = ... + * val table = in.toTable + * }}} + * + * Here, the result is a [[Table]] that has field `_1` of type `String` and field `_2` + * of type `Int`. + */ + def toTable: Table[ScalaBatchTranslator] = { + val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference) + as(resultFields: _*) + } + +} +
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala new file mode 100644 index 0000000..5da02c4 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table + +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table._ +import org.apache.flink.api.table.tree.{Expression, UnresolvedFieldReference} +import org.apache.flink.streaming.api.scala.DataStream + +class DataStreamConversions[T](stream: DataStream[T], inputType: CompositeType[T]) { + + /** + * Converts the [[DataStream]] to a [[Table]]. The field names can be specified like this: + * + * {{{ + * val in: DataSet[(String, Int)] = ... + * val table = in.as('a, 'b) + * }}} + * + * This results in a [[Table]] that has field `a` of type `String` and field `b` + * of type `Int`. + */ + + def as(fields: Expression*): Table[ScalaStreamingTranslator] = { + new ScalaStreamingTranslator().createTable( + stream, + fields.toArray, + checkDeterministicFields = true).asInstanceOf[Table[ScalaStreamingTranslator]] + } + + /** + * Converts the [[DataStream]] to a [[Table]]. The field names will be taken from the field + * names of the input type. + * + * Example: + * + * {{{ + * val in: DataSet[(String, Int)] = ... + * val table = in.toTable + * }}} + * + * This results in a [[Table]] that has field `_1` of type `String` and field `_2` + * of type `Int`. + */ + + def toTable: Table[ScalaStreamingTranslator] = { + val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference) + as(resultFields: _*) + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala new file mode 100644 index 0000000..4b25a50 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table + + +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.table.JavaBatchTranslator +import org.apache.flink.api.table.tree.Expression +import org.apache.flink.api.scala.wrap +import org.apache.flink.api.table.operations._ +import org.apache.flink.api.table.Table +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.DataSet + +import scala.reflect.ClassTag + + +/** + * [[TableTranslator]] for creating [[Table]]s from Scala [[DataSet]]s and + * translating them back to Scala [[DataSet]]s. + */ +class ScalaBatchTranslator extends TableTranslator { + + private val javaTranslator = new JavaBatchTranslator + + type Representation[A] = DataSet[A] + + def createTable[A]( + repr: DataSet[A], + fields: Array[Expression]): Table[ScalaBatchTranslator] = { + + val result = javaTranslator.createTable(repr.javaSet, fields) + + new Table[ScalaBatchTranslator](result.operation, this) + } + + override def translate[O](op: Operation)(implicit tpe: TypeInformation[O]): DataSet[O] = { + // fake it till you make it ... + wrap(javaTranslator.translate(op))(ClassTag.AnyRef.asInstanceOf[ClassTag[O]]) + } + + override def createTable[A]( + repr: Representation[A], + inputType: CompositeType[A], + expressions: Array[Expression], + resultFields: Seq[(String, TypeInformation[_])]): Table[this.type] = { + + val result = javaTranslator.createTable(repr.javaSet, inputType, expressions, resultFields) + + Table(result.operation, this) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala new file mode 100644 index 0000000..f17f273 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.table.JavaStreamingTranslator +import org.apache.flink.api.table.Table +import org.apache.flink.api.table.operations._ +import org.apache.flink.api.table.tree.Expression +import org.apache.flink.streaming.api.scala.{DataStream, javaToScalaStream} + +/** + * [[TableTranslator]] for creating [[Table]]s from Scala [[DataStream]]s and + * translating them back to Scala [[DataStream]]s. + * + * This is very limited right now. Only select and filter are implemented. Also, the expression + * operations must be extended to allow windowing operations. + */ +class ScalaStreamingTranslator extends TableTranslator { + + private val javaTranslator = new JavaStreamingTranslator + + override type Representation[A] = DataStream[A] + + override def translate[O](op: Operation)(implicit tpe: TypeInformation[O]): DataStream[O] = { + // fake it till you make it ... + javaToScalaStream(javaTranslator.translate(op)) + } + + override def createTable[A]( + repr: Representation[A], + inputType: CompositeType[A], + expressions: Array[Expression], + resultFields: Seq[(String, TypeInformation[_])]): Table[this.type] = { + + val result = + javaTranslator.createTable(repr.getJavaStream, inputType, expressions, resultFields) + + new Table(result.operation, this) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala new file mode 100644 index 0000000..e08ceb3 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.table + +import org.apache.flink.api.table.tree._ +import org.apache.flink.api.common.typeinfo.TypeInformation + +import scala.language.implicitConversions + +/** + * These are all the operations that can be used to construct an [[Expression]] AST for expression + * operations. + * + * These operations must be kept in sync with the parser in + * [[org.apache.flink.api.table.parser.ExpressionParser]]. + */ +trait ImplicitExpressionOperations { + def expr: Expression + + def && (other: Expression) = And(expr, other) + def || (other: Expression) = Or(expr, other) + + def > (other: Expression) = GreaterThan(expr, other) + def >= (other: Expression) = GreaterThanOrEqual(expr, other) + def < (other: Expression) = LessThan(expr, other) + def <= (other: Expression) = LessThanOrEqual(expr, other) + + def === (other: Expression) = EqualTo(expr, other) + def !== (other: Expression) = NotEqualTo(expr, other) + + def unary_! = Not(expr) + def unary_- = UnaryMinus(expr) + + def isNull = IsNull(expr) + def isNotNull = IsNotNull(expr) + + def + (other: Expression) = Plus(expr, other) + def - (other: Expression) = Minus(expr, other) + def / (other: Expression) = Div(expr, other) + def * (other: Expression) = Mul(expr, other) + def % (other: Expression) = Mod(expr, other) + + def & (other: Expression) = BitwiseAnd(expr, other) + def | (other: Expression) = BitwiseOr(expr, other) + def ^ (other: Expression) = BitwiseXor(expr, other) + def unary_~ = BitwiseNot(expr) + + def abs = Abs(expr) + + def sum = Sum(expr) + def min = Min(expr) + def max = Max(expr) + def count = Count(expr) + def avg = Avg(expr) + + def substring(beginIndex: Expression, endIndex: Expression = Literal(Int.MaxValue)) = { + Substring(expr, beginIndex, endIndex) + } + + def cast(toType: TypeInformation[_]) = Cast(expr, toType) + + def as(name: Symbol) = Naming(expr, name.name) +} + +/** + * Implicit conversions from Scala Literals to Expression [[Literal]] and from [[Expression]] + * to [[ImplicitExpressionOperations]]. + */ +trait ImplicitExpressionConversions { + implicit class WithOperations(e: Expression) extends ImplicitExpressionOperations { + def expr = e + } + + implicit class SymbolExpression(s: Symbol) extends ImplicitExpressionOperations { + def expr = UnresolvedFieldReference(s.name) + } + + implicit class LiteralLongExpression(l: Long) extends ImplicitExpressionOperations { + def expr = Literal(l) + } + + implicit class LiteralIntExpression(i: Int) extends ImplicitExpressionOperations { + def expr = Literal(i) + } + + implicit class LiteralFloatExpression(f: Float) extends ImplicitExpressionOperations { + def expr = Literal(f) + } + + implicit class LiteralDoubleExpression(d: Double) extends ImplicitExpressionOperations { + def expr = Literal(d) + } + + implicit class LiteralStringExpression(str: String) extends ImplicitExpressionOperations { + def expr = Literal(str) + } + + implicit class LiteralBooleanExpression(bool: Boolean) extends ImplicitExpressionOperations { + def expr = Literal(bool) + } + + implicit def symbol2FieldExpression(sym: Symbol): Expression = UnresolvedFieldReference(sym.name) + implicit def int2Literal(i: Int): Expression = Literal(i) + implicit def long2Literal(l: Long): Expression = Literal(l) + implicit def double2Literal(d: Double): Expression = Literal(d) + implicit def float2Literal(d: Float): Expression = Literal(d) + implicit def string2Literal(str: String): Expression = Literal(str) + implicit def boolean2Literal(bool: Boolean): Expression = Literal(bool) +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala new file mode 100644 index 0000000..10084de --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala + +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.{Row, Table} +import org.apache.flink.streaming.api.scala.DataStream + +import scala.language.implicitConversions + +/** + * == Table API (Scala) == + * + * Importing this package with: + * + * {{{ + * import org.apache.flink.api.scala.table._ + * }}} + * + * imports implicit conversions for converting a [[DataSet]] or [[DataStream]] to a + * [[Table]]. This can be used to perform SQL-like queries on data. Please have + * a look at [[Table]] to see which operations are supported and + * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]] to see how an + * expression can be specified. + * + * When writing a query you can use Scala Symbols to refer to field names. One would + * refer to field `a` by writing `'a`. Sometimes it is necessary to manually confert a + * Scala literal to an Expression Literal, in those cases use `Literal`, as in `Literal(3)`. + * + * Example: + * + * {{{ + * import org.apache.flink.api.scala._ + * import org.apache.flink.api.scala.table._ + * + * val env = ExecutionEnvironment.getExecutionEnvironment + * val input = env.fromElements(("Hello", 2), ("Hello", 5), ("Ciao", 3)) + * val result = input.as('word, 'count).groupBy('word).select('word, 'count.avg) + * result.print() + * + * env.execute() + * }}} + * + * A [[Table]] can be converted back to the underlying API + * representation using `as`: + * + * {{{ + * case class Word(word: String, count: Int) + * + * val result = in.select(...).as('word, 'count) + * val set = result.as[Word] + * }}} + */ +package object table extends ImplicitExpressionConversions { + + implicit def dataSet2DataSetConversions[T](set: DataSet[T]): DataSetConversions[T] = { + new DataSetConversions[T](set, set.getType.asInstanceOf[CompositeType[T]]) + } + + implicit def table2RowDataSet( + table: Table[ScalaBatchTranslator]): DataSet[Row] = { + table.as[Row] + } + + implicit def rowDataSet2Table( + rowDataSet: DataSet[Row]): Table[ScalaBatchTranslator] = { + rowDataSet.toTable + } + + implicit def dataStream2DataSetConversions[T]( + stream: DataStream[T]): DataStreamConversions[T] = { + new DataStreamConversions[T]( + stream, + stream.getJavaStream.getType.asInstanceOf[CompositeType[T]]) + } + + implicit def table2RowDataStream( + table: Table[ScalaStreamingTranslator]): DataStream[Row] = { + table.as[Row] + } + + implicit def rowDataStream2Table( + rowDataStream: DataStream[Row]): Table[ScalaStreamingTranslator] = { + rowDataStream.toTable + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala new file mode 100644 index 0000000..51c0a4d --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +/** + * Exception for all errors occurring during expression evaluation. + */ +class ExpressionException(msg: String) extends RuntimeException(msg) http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala new file mode 100644 index 0000000..e3baab3 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +/** + * This is used for executing Table API operations. We use manually generated + * TypeInfo to check the field types and create serializers and comparators. + */ +class Row(arity: Int) extends Product { + + private val fields = new Array[Any](arity) + + def productArity = fields.length + + def productElement(i: Int): Any = fields(i) + + def setField(i: Int, value: Any): Unit = fields(i) = value + + def canEqual(that: Any) = false + + override def toString = fields.mkString(",") + +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala new file mode 100644 index 0000000..38a1760 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.analysis.{GroupByAnalyzer, SelectionAnalyzer, PredicateAnalyzer} +import org.apache.flink.api.table.operations._ +import org.apache.flink.api.table.parser.ExpressionParser +import org.apache.flink.api.table.tree.{ResolvedFieldReference, UnresolvedFieldReference, Expression} + +/** + * The abstraction for writing Table API programs. Similar to how the batch and streaming APIs + * have [[org.apache.flink.api.scala.DataSet]] and + * [[org.apache.flink.streaming.api.scala.DataStream]]. + * + * Use the methods of [[Table]] to transform data or to revert back to the underlying + * batch or streaming representation. + */ +case class Table[A <: TableTranslator]( + private[flink] val operation: Operation, + private[flink] val operationTranslator: A) { + + + /** + * Converts the result of this operation back to a [[org.apache.flink.api.scala.DataSet]] or + * [[org.apache.flink.streaming.api.scala.DataStream]]. + */ + def as[O](implicit tpe: TypeInformation[O]): operationTranslator.Representation[O] = { + operationTranslator.translate(operation) + } + + /** + * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions + * can contain complex expressions and aggregations. + * + * Example: + * + * {{{ + * in.select('key, 'value.avg + " The average" as 'average, 'other.substring(0, 10)) + * }}} + */ + def select(fields: Expression*): Table[A] = { + val analyzer = new SelectionAnalyzer(operation.outputFields) + val analyzedFields = fields.map(analyzer.analyze) + val fieldNames = analyzedFields map(_.name) + if (fieldNames.toSet.size != fieldNames.size) { + throw new ExpressionException(s"Resulting fields names are not unique in expression" + + s""" "${fields.mkString(", ")}".""") + } + this.copy(operation = Select(operation, analyzedFields)) + } + + /** + * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions + * can contain complex expressions and aggregations. + * + * Example: + * + * {{{ + * in.select("key, value.avg + " The average" as average, other.substring(0, 10)") + * }}} + */ + def select(fields: String): Table[A] = { + val fieldExprs = ExpressionParser.parseExpressionList(fields) + select(fieldExprs: _*) + } + + /** + * Renames the fields of the expression result. Use this to disambiguate fields before + * joining to operations. + * + * Example: + * + * {{{ + * in.as('a, 'b) + * }}} + */ + def as(fields: Expression*): Table[A] = { + fields forall { + f => f.isInstanceOf[UnresolvedFieldReference] + } match { + case true => + case false => throw new ExpressionException("Only field expression allowed in as().") + } + this.copy(operation = As(operation, fields.toArray map { _.name })) + } + + /** + * Renames the fields of the expression result. Use this to disambiguate fields before + * joining to operations. + * + * Example: + * + * {{{ + * in.as("a, b") + * }}} + */ + def as(fields: String): Table[A] = { + val fieldExprs = ExpressionParser.parseExpressionList(fields) + as(fieldExprs: _*) + } + + /** + * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE + * clause. + * + * Example: + * + * {{{ + * in.filter('name === "Fred") + * }}} + */ + def filter(predicate: Expression): Table[A] = { + val analyzer = new PredicateAnalyzer(operation.outputFields) + val analyzedPredicate = analyzer.analyze(predicate) + this.copy(operation = Filter(operation, analyzedPredicate)) + } + + /** + * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE + * clause. + * + * Example: + * + * {{{ + * in.filter("name === 'Fred'") + * }}} + */ + def filter(predicate: String): Table[A] = { + val predicateExpr = ExpressionParser.parseExpression(predicate) + filter(predicateExpr) + } + + /** + * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE + * clause. + * + * Example: + * + * {{{ + * in.filter(name === "Fred") + * }}} + */ + def where(predicate: Expression): Table[A] = { + filter(predicate) + } + + /** + * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE + * clause. + * + * Example: + * + * {{{ + * in.filter("name === 'Fred'") + * }}} + */ + def where(predicate: String): Table[A] = { + filter(predicate) + } + + /** + * Groups the elements on some grouping keys. Use this before a selection with aggregations + * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + * + * Example: + * + * {{{ + * in.groupBy('key).select('key, 'value.avg) + * }}} + */ + def groupBy(fields: Expression*): Table[A] = { + val analyzer = new GroupByAnalyzer(operation.outputFields) + val analyzedFields = fields.map(analyzer.analyze) + + val illegalKeys = analyzedFields filter { + case fe: ResolvedFieldReference => false // OK + case e => true + } + + if (illegalKeys.nonEmpty) { + throw new ExpressionException("Illegal key expressions: " + illegalKeys.mkString(", ")) + } + + this.copy(operation = GroupBy(operation, analyzedFields)) + } + + /** + * Groups the elements on some grouping keys. Use this before a selection with aggregations + * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + * + * Example: + * + * {{{ + * in.groupBy("key").select("key, value.avg") + * }}} + */ + def groupBy(fields: String): Table[A] = { + val fieldsExpr = ExpressionParser.parseExpressionList(fields) + groupBy(fieldsExpr: _*) + } + + /** + * Joins to [[Table]]s. Similar to an SQL join. The fields of the two joined + * operations must not overlap, use [[as]] to rename fields if necessary. You can use + * where and select clauses after a join to further specify the behaviour of the join. + * + * Example: + * + * {{{ + * left.join(right).where('a === 'b && 'c > 3).select('a, 'b, 'd) + * }}} + */ + def join(right: Table[A]): Table[A] = { + val leftInputNames = operation.outputFields.map(_._1).toSet + val rightInputNames = right.operation.outputFields.map(_._1).toSet + if (leftInputNames.intersect(rightInputNames).nonEmpty) { + throw new ExpressionException( + "Overlapping fields names on join input, result would be ambiguous: " + + operation.outputFields.mkString(", ") + + " and " + + right.operation.outputFields.mkString(", ") ) + } + this.copy(operation = Join(operation, right.operation)) + } + + override def toString: String = s"Expression($operation)" +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/Analyzer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/Analyzer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/Analyzer.scala new file mode 100644 index 0000000..5db17d5 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/Analyzer.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.analysis + +import org.apache.flink.api.table.tree.Expression + +/** + * Base class for expression analyzers/transformers. Analyzers must implement method `rules` to + * provide the chain of rules that are invoked one after another. The expression resulting + * from one rule is fed into the next rule and the final result is returned from method `analyze`. + */ +abstract class Analyzer { + + def rules: Seq[Rule] + + final def analyze(expr: Expression): Expression = { + var currentTree = expr + for (rule <- rules) { + currentTree = rule(currentTree) + } + currentTree + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/ExtractEquiJoinFields.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/ExtractEquiJoinFields.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/ExtractEquiJoinFields.scala new file mode 100644 index 0000000..6381a9b --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/ExtractEquiJoinFields.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.analysis + +import org.apache.flink.api.table.tree._ +import org.apache.flink.api.common.typeutils.CompositeType + +import scala.collection.mutable + +/** + * Equi-join field extractor for Join Predicates and CoGroup predicates. The result is a modified + * expression without the equi-join predicates together with indices of the join fields + * from both the left and right input. + */ +object ExtractEquiJoinFields { + def apply(leftType: CompositeType[_], rightType: CompositeType[_], predicate: Expression) = { + + val joinFieldsLeft = mutable.MutableList[Int]() + val joinFieldsRight = mutable.MutableList[Int]() + + val equiJoinExprs = mutable.MutableList[EqualTo]() + // First get all `===` expressions that are not below an `Or` + predicate.transformPre { + case or@Or(_, _) => NopExpression() + case eq@EqualTo(le: ResolvedFieldReference, re: ResolvedFieldReference) => + if (leftType.hasField(le.name) && rightType.hasField(re.name)) { + joinFieldsLeft += leftType.getFieldIndex(le.name) + joinFieldsRight += rightType.getFieldIndex(re.name) + } else if (leftType.hasField(re.name) && rightType.hasField(le.name)) { + joinFieldsLeft += leftType.getFieldIndex(re.name) + joinFieldsRight += rightType.getFieldIndex(le.name) + } else { + // not an equi-join predicate + } + equiJoinExprs += eq + eq + } + + // then remove the equi join expressions from the predicate + val resultExpr = predicate.transformPost { + // For OR, we can eliminate the OR since the equi join + // predicate is evaluated before the expression is evaluated + case or@Or(NopExpression(), _) => NopExpression() + case or@Or(_, NopExpression()) => NopExpression() + // For AND we replace it with the other expression, since the + // equi join predicate will always be true + case and@And(NopExpression(), other) => other + case and@And(other, NopExpression()) => other + case eq : EqualTo if equiJoinExprs.contains(eq) => + NopExpression() + } + + (resultExpr, joinFieldsLeft.toArray, joinFieldsRight.toArray) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/GroupByAnalyzer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/GroupByAnalyzer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/GroupByAnalyzer.scala new file mode 100644 index 0000000..4106a4c --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/GroupByAnalyzer.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.analysis + +import org.apache.flink.api.table._ +import org.apache.flink.api.table.tree.{ResolvedFieldReference, Expression} +import org.apache.flink.api.common.typeinfo.TypeInformation + +import scala.collection.mutable + + +/** + * Analyzer for grouping expressions. Only field expressions are allowed as grouping expressions. + */ +class GroupByAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) extends Analyzer { + + def rules = Seq(new ResolveFieldReferences(inputFields), CheckGroupExpression) + + object CheckGroupExpression extends Rule { + + def apply(expr: Expression) = { + val errors = mutable.MutableList[String]() + + expr match { + case f: ResolvedFieldReference => // this is OK + case other => + throw new ExpressionException( + s"""Invalid grouping expression "$expr". Only field references are allowed.""") + } + expr + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/InsertAutoCasts.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/InsertAutoCasts.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/InsertAutoCasts.scala new file mode 100644 index 0000000..29e76cd --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/InsertAutoCasts.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.analysis + +import org.apache.flink.api.table.tree._ +import org.apache.flink.api.common.typeinfo.{IntegerTypeInfo, BasicTypeInfo} + +/** + * [[Rule]] that adds casts in arithmetic operations. + */ +class InsertAutoCasts extends Rule { + + def apply(expr: Expression) = { + val result = expr.transformPost { + + case plus@Plus(o1, o2) => + // Plus is special case since we can cast anything to String for String concat + if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && o2.typeInfo.isBasicType) { + if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo( + o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { + Plus(Cast(o1, o2.typeInfo), o2) + } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo( + o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { + Plus(o1, Cast(o2, o1.typeInfo)) + } else if (o1.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) { + Plus(o1, Cast(o2, BasicTypeInfo.STRING_TYPE_INFO)) + } else if (o2.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) { + Plus(Cast(o1, BasicTypeInfo.STRING_TYPE_INFO), o2) + } else { + plus + } + } else { + plus + } + + case ba: BinaryExpression if ba.isInstanceOf[BinaryArithmetic] || + ba.isInstanceOf[BinaryComparison] => + val o1 = ba.left + val o2 = ba.right + if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && o2.typeInfo.isBasicType) { + if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo( + o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { + ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2)) + } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo( + o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { + ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo))) + } else { + ba + } + } else { + ba + } + + case ba: BinaryExpression if ba.isInstanceOf[BitwiseBinaryArithmetic] => + val o1 = ba.left + val o2 = ba.right + if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isInstanceOf[IntegerTypeInfo[_]] && + o2.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { + if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo( + o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { + ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2)) + } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo( + o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) { + ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo))) + } else { + ba + } + } else { + ba + } + } + + result + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/PredicateAnalyzer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/PredicateAnalyzer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/PredicateAnalyzer.scala new file mode 100644 index 0000000..0c74695 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/PredicateAnalyzer.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.analysis + +import org.apache.flink.api.common.typeinfo.TypeInformation + +/** + * Analyzer for predicates, i.e. filter operations and where clauses of joins. + */ +class PredicateAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) extends Analyzer { + def rules = Seq( + new ResolveFieldReferences(inputFields), + new InsertAutoCasts, + new TypeCheck, + new VerifyNoAggregates, + new VerifyBoolean) +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/ResolveFieldReferences.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/ResolveFieldReferences.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/ResolveFieldReferences.scala new file mode 100644 index 0000000..3f7cbd2 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/ResolveFieldReferences.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.analysis + +import org.apache.flink.api.table.tree.{ResolvedFieldReference, +UnresolvedFieldReference, Expression} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table._ + +import scala.collection.mutable + +/** + * Rule that resolved field references. This rule verifies that field references point to existing + * fields of the input operation and creates [[ResolvedFieldReference]]s that hold the field + * [[TypeInformation]] in addition to the field name. + */ +class ResolveFieldReferences(inputFields: Seq[(String, TypeInformation[_])]) extends Rule { + + def apply(expr: Expression) = { + val errors = mutable.MutableList[String]() + + val result = expr.transformPost { + case fe@UnresolvedFieldReference(fieldName) => + inputFields.find { _._1 == fieldName } match { + case Some((_, tpe)) => ResolvedFieldReference(fieldName, tpe) + + case None => + errors += + s"Field '$fieldName' is not valid for input fields ${inputFields.mkString(",")}" + fe + } + } + + if (errors.length > 0) { + throw new ExpressionException( + s"""Invalid expression "$expr": ${errors.mkString(" ")}""") + } + + result + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/Rule.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/Rule.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/Rule.scala new file mode 100644 index 0000000..34c5750 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/Rule.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.analysis + +import org.apache.flink.api.table.tree.Expression + +/** + * Base class for a rule that is part of an [[Analyzer]] rule chain. Method `rule` gets on + * [[Expression]] and must return an expression. The returned [[Expression]] can also be + * the input [[Expression]]. In an [[Analyzer]] rule chain the result [[Expression]] of one + * [[Rule]] is fed into the next [[Rule]] in the chain. + */ +abstract class Rule { + def apply(expr: Expression): Expression +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/SelectionAnalyzer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/SelectionAnalyzer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/SelectionAnalyzer.scala new file mode 100644 index 0000000..70aadb8 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/SelectionAnalyzer.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.analysis + +import org.apache.flink.api.common.typeinfo.TypeInformation + +/** + * This analyzes selection expressions. + */ +class SelectionAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) extends Analyzer { + + def rules = Seq( + new ResolveFieldReferences(inputFields), + new VerifyNoNestedAggregates, + new InsertAutoCasts, + new TypeCheck) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/TypeCheck.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/TypeCheck.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/TypeCheck.scala new file mode 100644 index 0000000..3304726 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/TypeCheck.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.analysis + +import org.apache.flink.api.table.tree.Expression +import org.apache.flink.api.table.{_} + +import scala.collection.mutable + +/** + * Rule that makes sure we call [[Expression.typeInfo]] on each [[Expression]] at least once. + * Expressions are expected to perform type verification in this method. + */ +class TypeCheck extends Rule { + + def apply(expr: Expression) = { + val errors = mutable.MutableList[String]() + + val result = expr.transformPre { + case expr: Expression=> { + // simply get the typeInfo from the expression. this will perform type analysis + try { + expr.typeInfo + } catch { + case e: ExpressionException => + errors += e.getMessage + } + expr + } + } + + if (errors.length > 0) { + throw new ExpressionException( + s"""Invalid expression "$expr": ${errors.mkString(" ")}""") + } + + result + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyBoolean.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyBoolean.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyBoolean.scala new file mode 100644 index 0000000..d554c44 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyBoolean.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.analysis + +import org.apache.flink.api.table.tree.{NopExpression, Expression} +import org.apache.flink.api.table.{_} +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +import scala.collection.mutable + +/** + * [[Rule]] that verifies that the result type of an [[Expression]] is Boolean. This is required + * for filter/join predicates. + */ +class VerifyBoolean extends Rule { + + def apply(expr: Expression) = { + if (!expr.isInstanceOf[NopExpression] && expr.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) { + throw new ExpressionException(s"Expression $expr of type ${expr.typeInfo} is not boolean.") + } + + expr + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyNoAggregates.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyNoAggregates.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyNoAggregates.scala new file mode 100644 index 0000000..6f5201c --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyNoAggregates.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.analysis + +import org.apache.flink.api.table.ExpressionException +import org.apache.flink.api.table.tree.{Aggregation, Expression} + +import scala.collection.mutable + +/** + * Rule that verifies that an expression does not contain aggregate operations. Right now, join + * predicates and filter predicates cannot contain aggregates. + */ +class VerifyNoAggregates extends Rule { + + def apply(expr: Expression) = { + val errors = mutable.MutableList[String]() + + val result = expr.transformPre { + case agg: Aggregation=> { + errors += + s"""Aggregations are not allowed in join/filter predicates.""" + agg + } + } + + if (errors.length > 0) { + throw new ExpressionException( + s"""Invalid expression "$expr": ${errors.mkString(" ")}""") + } + + result + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyNoNestedAggregates.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyNoNestedAggregates.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyNoNestedAggregates.scala new file mode 100644 index 0000000..9055355 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/analysis/VerifyNoNestedAggregates.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.analysis + +import org.apache.flink.api.table.ExpressionException +import org.apache.flink.api.table.tree.{Expression, Aggregation} + +import scala.collection.mutable + +/** + * Rule that verifies that an expression does not contain aggregate operations + * as children of aggregate operations. + */ +class VerifyNoNestedAggregates extends Rule { + + def apply(expr: Expression) = { + val errors = mutable.MutableList[String]() + + val result = expr.transformPre { + case agg: Aggregation=> { + if (agg.child.exists(_.isInstanceOf[Aggregation])) { + errors += s"""Found nested aggregation inside "$agg".""" + } + agg + } + } + + if (errors.length > 0) { + throw new ExpressionException( + s"""Invalid expression "$expr": ${errors.mkString(" ")}""") + } + + result + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala new file mode 100644 index 0000000..5ec8579 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala @@ -0,0 +1,635 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.codegen + +import java.util.concurrent.atomic.AtomicInteger +import org.apache.flink.api.table.tree._ +import org.apache.flink.api.table.typeinfo.{RenamingProxyTypeInfo, RowTypeInfo} +import org.apache.flink.api.table.{ExpressionException, tree} +import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, BasicTypeInfo, TypeInformation} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.typeutils.{TupleTypeInfo, PojoTypeInfo} +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.slf4j.LoggerFactory + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** Base class for all code generation classes. This provides the functionality for generating + * code from an [[Expression]] tree. Derived classes must embed this in a lambda function + * to form an executable code block. + * + * @param inputs List of input variable names with corresponding [[TypeInformation]]. + * @param nullCheck Whether the generated code should include checks for NULL values. + * @param cl The ClassLoader that is used to create the Scala reflection ToolBox + * @tparam R The type of the generated code block. In most cases a lambda function such + * as "(IN1, IN2) => OUT". + */ +abstract class ExpressionCodeGenerator[R]( + inputs: Seq[(String, CompositeType[_])], + val nullCheck: Boolean = false, + cl: ClassLoader) { + protected val log = LoggerFactory.getLogger(classOf[ExpressionCodeGenerator[_]]) + + import scala.reflect.runtime.{universe => ru} + import scala.reflect.runtime.universe._ + + if (cl == null) { + throw new IllegalArgumentException("ClassLoader must not be null.") + } + + import scala.tools.reflect.ToolBox + + protected val (mirror, toolBox) = ReflectionLock.synchronized { + val mirror = runtimeMirror(cl) + (mirror, mirror.mkToolBox()) + } + + // This is to be implemented by subclasses, we have it like this + // so that we only call it from here with the Scala Reflection Lock. + protected def generateInternal(): R + + final def generate(): R = { + ReflectionLock.synchronized { + generateInternal() + } + } + + val cache = mutable.HashMap[Expression, GeneratedExpression]() + + protected def generateExpression(expr: Expression): GeneratedExpression = { + // doesn't work yet, because we insert the same code twice and reuse variable names + // cache.getOrElseUpdate(expr, generateExpressionInternal(expr)) + generateExpressionInternal(expr) + } + + protected def generateExpressionInternal(expr: Expression): GeneratedExpression = { + // protected def generateExpression(expr: Expression): GeneratedExpression = { + val nullTerm = freshTermName("isNull") + val resultTerm = freshTermName("result") + + // For binary predicates that must only be evaluated when both operands are non-null. + // This will write to nullTerm and resultTerm, so don't use those term names + // after using this function + def generateIfNonNull(left: Expression, right: Expression, resultType: TypeInformation[_]) + (expr: (TermName, TermName) => Tree): Seq[Tree] = { + val leftCode = generateExpression(left) + val rightCode = generateExpression(right) + + + if (nullCheck) { + leftCode.code ++ rightCode.code ++ q""" + val $nullTerm = ${leftCode.nullTerm}|| ${rightCode.nullTerm} + val $resultTerm = if ($nullTerm) { + ${defaultPrimitive(resultType)} + } else { + ${expr(leftCode.resultTerm, rightCode.resultTerm)} + } + """.children + } else { + leftCode.code ++ rightCode.code :+ q""" + val $resultTerm = ${expr(leftCode.resultTerm, rightCode.resultTerm)} + """ + } + } + + val cleanedExpr = expr match { + case tree.Naming(namedExpr, _) => namedExpr + case _ => expr + } + + val code: Seq[Tree] = cleanedExpr match { + + case tree.Literal(null, typeInfo) => + if (nullCheck) { + q""" + val $nullTerm = true + val resultTerm = null + """.children + } else { + Seq( q""" + val resultTerm = null + """) + } + + case tree.Literal(intValue: Int, INT_TYPE_INFO) => + if (nullCheck) { + q""" + val $nullTerm = false + val $resultTerm = $intValue + """.children + } else { + Seq( q""" + val $resultTerm = $intValue + """) + } + + case tree.Literal(longValue: Long, LONG_TYPE_INFO) => + if (nullCheck) { + q""" + val $nullTerm = false + val $resultTerm = $longValue + """.children + } else { + Seq( q""" + val $resultTerm = $longValue + """) + } + + + case tree.Literal(doubleValue: Double, DOUBLE_TYPE_INFO) => + if (nullCheck) { + q""" + val $nullTerm = false + val $resultTerm = $doubleValue + """.children + } else { + Seq( q""" + val $resultTerm = $doubleValue + """) + } + + case tree.Literal(floatValue: Float, FLOAT_TYPE_INFO) => + if (nullCheck) { + q""" + val $nullTerm = false + val $resultTerm = $floatValue + """.children + } else { + Seq( q""" + val $resultTerm = $floatValue + """) + } + + case tree.Literal(strValue: String, STRING_TYPE_INFO) => + if (nullCheck) { + q""" + val $nullTerm = false + val $resultTerm = $strValue + """.children + } else { + Seq( q""" + val $resultTerm = $strValue + """) + } + + case tree.Literal(boolValue: Boolean, BOOLEAN_TYPE_INFO) => + if (nullCheck) { + q""" + val $nullTerm = false + val $resultTerm = $boolValue + """.children + } else { + Seq( q""" + val $resultTerm = $boolValue + """) + } + + case Substring(str, beginIndex, endIndex) => + val strCode = generateExpression(str) + val beginIndexCode = generateExpression(beginIndex) + val endIndexCode = generateExpression(endIndex) + if (nullCheck) { + strCode.code ++ beginIndexCode.code ++ endIndexCode.code ++ q""" + val $nullTerm = + ${strCode.nullTerm}|| ${beginIndexCode.nullTerm}|| ${endIndexCode.nullTerm} + if ($nullTerm) { + ${defaultPrimitive(str.typeInfo)} + } else { + val $resultTerm = if (${endIndexCode.resultTerm} == Int.MaxValue) { + (${strCode.resultTerm}).substring(${beginIndexCode.resultTerm}) + } else { + (${strCode.resultTerm}).substring( + ${beginIndexCode.resultTerm}, + ${endIndexCode.resultTerm}) + } + } + """.children + } else { + strCode.code ++ beginIndexCode.code ++ endIndexCode.code :+ q""" + val $resultTerm = if (${endIndexCode.resultTerm} == Int.MaxValue) { + (${strCode.resultTerm}).substring(${beginIndexCode.resultTerm}) + } else { + (${strCode.resultTerm}).substring( + ${beginIndexCode.resultTerm}, + ${endIndexCode.resultTerm}) + } + """ + } + + case tree.Cast(child: Expression, STRING_TYPE_INFO) => + val childGen = generateExpression(child) + val castCode = if (nullCheck) { + q""" + val $nullTerm = ${childGen.nullTerm} + val $resultTerm = if ($nullTerm == null) { + null + } else { + ${childGen.resultTerm}.toString + } + """.children + } else { + Seq( q""" + val $resultTerm = ${childGen.resultTerm}.toString + """) + } + childGen.code ++ castCode + + case tree.Cast(child: Expression, INT_TYPE_INFO) => + val childGen = generateExpression(child) + val castCode = if (nullCheck) { + q""" + val $nullTerm = ${childGen.nullTerm} + val $resultTerm = ${childGen.resultTerm}.toInt + """.children + } else { + Seq( q""" + val $resultTerm = ${childGen.resultTerm}.toInt + """) + } + childGen.code ++ castCode + + case tree.Cast(child: Expression, LONG_TYPE_INFO) => + val childGen = generateExpression(child) + val castCode = if (nullCheck) { + q""" + val $nullTerm = ${childGen.nullTerm} + val $resultTerm = ${childGen.resultTerm}.toLong + """.children + } else { + Seq( q""" + val $resultTerm = ${childGen.resultTerm}.toLong + """) + } + childGen.code ++ castCode + + case tree.Cast(child: Expression, FLOAT_TYPE_INFO) => + val childGen = generateExpression(child) + val castCode = if (nullCheck) { + q""" + val $nullTerm = ${childGen.nullTerm} + val $resultTerm = ${childGen.resultTerm}.toFloat + """.children + } else { + Seq( q""" + val $resultTerm = ${childGen.resultTerm}.toFloat + """) + } + childGen.code ++ castCode + + case tree.Cast(child: Expression, DOUBLE_TYPE_INFO) => + val childGen = generateExpression(child) + val castCode = if (nullCheck) { + q""" + val $nullTerm = ${childGen.nullTerm} + val $resultTerm = ${childGen.resultTerm}.toDouble + """.children + } else { + Seq( q""" + val $resultTerm = ${childGen.resultTerm}.toDouble + """) + } + childGen.code ++ castCode + + case ResolvedFieldReference(fieldName, fieldTpe: TypeInformation[_]) => + inputs find { i => i._2.hasField(fieldName)} match { + case Some((inputName, inputTpe)) => + val fieldCode = getField(newTermName(inputName), inputTpe, fieldName, fieldTpe) + if (nullCheck) { + q""" + val $resultTerm = $fieldCode + val $nullTerm = $resultTerm == null + """.children + } else { + Seq( q""" + val $resultTerm = $fieldCode + """) + } + + case None => throw new ExpressionException("Could not get accessor for " + fieldName + + " in inputs " + inputs.mkString(", ") + ".") + } + + case GreaterThan(left, right) => + generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { + (leftTerm, rightTerm) => q"$leftTerm > $rightTerm" + } + + case GreaterThanOrEqual(left, right) => + generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { + (leftTerm, rightTerm) => q"$leftTerm >= $rightTerm" + } + + case LessThan(left, right) => + generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { + (leftTerm, rightTerm) => q"$leftTerm < $rightTerm" + } + + case LessThanOrEqual(left, right) => + generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { + (leftTerm, rightTerm) => q"$leftTerm <= $rightTerm" + } + + case EqualTo(left, right) => + generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { + (leftTerm, rightTerm) => q"$leftTerm == $rightTerm" + } + + case NotEqualTo(left, right) => + generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { + (leftTerm, rightTerm) => q"$leftTerm != $rightTerm" + } + + case And(left, right) => + generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { + (leftTerm, rightTerm) => q"$leftTerm && $rightTerm" + } + + case Or(left, right) => + generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { + (leftTerm, rightTerm) => q"$leftTerm || $rightTerm" + } + + case Plus(left, right) => + generateIfNonNull(left, right, expr.typeInfo) { + (leftTerm, rightTerm) => q"$leftTerm + $rightTerm" + } + + case Minus(left, right) => + generateIfNonNull(left, right, expr.typeInfo) { + (leftTerm, rightTerm) => q"$leftTerm - $rightTerm" + } + + case Div(left, right) => + generateIfNonNull(left, right, expr.typeInfo) { + (leftTerm, rightTerm) => q"$leftTerm / $rightTerm" + } + + case Mul(left, right) => + generateIfNonNull(left, right, expr.typeInfo) { + (leftTerm, rightTerm) => q"$leftTerm * $rightTerm" + } + + case Mod(left, right) => + generateIfNonNull(left, right, expr.typeInfo) { + (leftTerm, rightTerm) => q"$leftTerm % $rightTerm" + } + + case UnaryMinus(child) => + val childCode = generateExpression(child) + if (nullCheck) { + childCode.code ++ q""" + val $nullTerm = ${childCode.nullTerm} + if ($nullTerm) { + ${defaultPrimitive(child.typeInfo)} + } else { + val $resultTerm = -(${childCode.resultTerm}) + } + """.children + } else { + childCode.code :+ q""" + val $resultTerm = -(${childCode.resultTerm}) + """ + } + + case BitwiseAnd(left, right) => + generateIfNonNull(left, right, expr.typeInfo) { + (leftTerm, rightTerm) => q"$leftTerm & $rightTerm" + } + + case BitwiseOr(left, right) => + generateIfNonNull(left, right, expr.typeInfo) { + (leftTerm, rightTerm) => q"$leftTerm | $rightTerm" + } + + case BitwiseXor(left, right) => + generateIfNonNull(left, right, expr.typeInfo) { + (leftTerm, rightTerm) => q"$leftTerm ^ $rightTerm" + } + + case BitwiseNot(child) => + val childCode = generateExpression(child) + if (nullCheck) { + childCode.code ++ q""" + val $nullTerm = ${childCode.nullTerm} + if ($nullTerm) { + ${defaultPrimitive(child.typeInfo)} + } else { + val $resultTerm = ~(${childCode.resultTerm}) + } + """.children + } else { + childCode.code :+ q""" + val $resultTerm = ~(${childCode.resultTerm}) + """ + } + + case Not(child) => + val childCode = generateExpression(child) + if (nullCheck) { + childCode.code ++ q""" + val $nullTerm = ${childCode.nullTerm} + if ($nullTerm) { + ${defaultPrimitive(child.typeInfo)} + } else { + val $resultTerm = !(${childCode.resultTerm}) + } + """.children + } else { + childCode.code :+ q""" + val $resultTerm = !(${childCode.resultTerm}) + """ + } + + case IsNull(child) => + val childCode = generateExpression(child) + if (nullCheck) { + childCode.code ++ q""" + val $nullTerm = ${childCode.nullTerm} + if ($nullTerm) { + ${defaultPrimitive(child.typeInfo)} + } else { + val $resultTerm = (${childCode.resultTerm}) == null + } + """.children + } else { + childCode.code :+ q""" + val $resultTerm = (${childCode.resultTerm}) == null + """ + } + + case IsNotNull(child) => + val childCode = generateExpression(child) + if (nullCheck) { + childCode.code ++ q""" + val $nullTerm = ${childCode.nullTerm} + if ($nullTerm) { + ${defaultPrimitive(child.typeInfo)} + } else { + val $resultTerm = (${childCode.resultTerm}) != null + } + """.children + } else { + childCode.code :+ q""" + val $resultTerm = (${childCode.resultTerm}) != null + """ + } + + case Abs(child) => + val childCode = generateExpression(child) + if (nullCheck) { + childCode.code ++ q""" + val $nullTerm = ${childCode.nullTerm} + if ($nullTerm) { + ${defaultPrimitive(child.typeInfo)} + } else { + val $resultTerm = Math.abs(${childCode.resultTerm}) + } + """.children + } else { + childCode.code :+ q""" + val $resultTerm = Math.abs(${childCode.resultTerm}) + """ + } + + case _ => throw new ExpressionException("Could not generate code for expression " + expr) + } + + GeneratedExpression(code, resultTerm, nullTerm) + } + + case class GeneratedExpression(code: Seq[Tree], resultTerm: TermName, nullTerm: TermName) + + // We don't have c.freshName + // According to http://docs.scala-lang.org/overviews/quasiquotes/hygiene.html + // it's coming for 2.11. We can't wait that long... + def freshTermName(name: String): TermName = { + newTermName(s"$name$$${freshNameCounter.getAndIncrement}") + } + + val freshNameCounter = new AtomicInteger + + protected def getField( + inputTerm: TermName, + inputType: CompositeType[_], + fieldName: String, + fieldType: TypeInformation[_]): Tree = { + val accessor = fieldAccessorFor(inputType, fieldName) + accessor match { + case ObjectFieldAccessor(fieldName) => + val fieldTerm = newTermName(fieldName) + q"$inputTerm.$fieldTerm.asInstanceOf[${typeTermForTypeInfo(fieldType)}]" + + case ObjectMethodAccessor(methodName) => + val methodTerm = newTermName(methodName) + q"$inputTerm.$methodTerm().asInstanceOf[${typeTermForTypeInfo(fieldType)}]" + + case ProductAccessor(i) => + q"$inputTerm.productElement($i).asInstanceOf[${typeTermForTypeInfo(fieldType)}]" + + } + } + + sealed abstract class FieldAccessor + + case class ObjectFieldAccessor(fieldName: String) extends FieldAccessor + + case class ObjectMethodAccessor(methodName: String) extends FieldAccessor + + case class ProductAccessor(i: Int) extends FieldAccessor + + def fieldAccessorFor(elementType: CompositeType[_], fieldName: String): FieldAccessor = { + elementType match { + case ri: RowTypeInfo => + ProductAccessor(elementType.getFieldIndex(fieldName)) + + case cc: CaseClassTypeInfo[_] => + ObjectFieldAccessor(fieldName) + + case javaTup: TupleTypeInfo[_] => + ObjectFieldAccessor(fieldName) + + case pj: PojoTypeInfo[_] => + ObjectFieldAccessor(fieldName) + + case proxy: RenamingProxyTypeInfo[_] => + val underlying = proxy.getUnderlyingType + val fieldIndex = proxy.getFieldIndex(fieldName) + fieldAccessorFor(underlying, underlying.getFieldNames()(fieldIndex)) + } + } + + protected def defaultPrimitive(tpe: TypeInformation[_]) = tpe match { + case BasicTypeInfo.INT_TYPE_INFO => ru.Literal(Constant(-1)) + case BasicTypeInfo.LONG_TYPE_INFO => ru.Literal(Constant(1L)) + case BasicTypeInfo.SHORT_TYPE_INFO => ru.Literal(Constant(-1.toShort)) + case BasicTypeInfo.BYTE_TYPE_INFO => ru.Literal(Constant(-1.toByte)) + case BasicTypeInfo.FLOAT_TYPE_INFO => ru.Literal(Constant(-1.0.toFloat)) + case BasicTypeInfo.DOUBLE_TYPE_INFO => ru.Literal(Constant(-1.toDouble)) + case BasicTypeInfo.BOOLEAN_TYPE_INFO => ru.Literal(Constant(false)) + case BasicTypeInfo.STRING_TYPE_INFO => ru.Literal(Constant("<empty>")) + case BasicTypeInfo.CHAR_TYPE_INFO => ru.Literal(Constant('\0')) + case _ => ru.Literal(Constant(null)) + } + + protected def typeTermForTypeInfo(typeInfo: TypeInformation[_]): Tree = { + val tpe = typeForTypeInfo(typeInfo) + tq"$tpe" + } + + // We need two separate methods here because typeForTypeInfo is recursive when generating + // the type for a type with generic parameters. + protected def typeForTypeInfo(tpe: TypeInformation[_]): Type = tpe match { + + // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections + // does not seem to like this, so we manually give the correct type here. + case PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Int]] + case PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Long]] + case PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Short]] + case PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Byte]] + case PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Float]] + case PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Double]] + case PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Boolean]] + case PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Char]] + + case _ => + val clazz = mirror.staticClass(tpe.getTypeClass.getCanonicalName) + + clazz.selfType.erasure match { + case ExistentialType(_, underlying) => underlying + + case tpe@TypeRef(prefix, sym, Nil) => + // Non-generic type, just return the type + tpe + + case TypeRef(prefix, sym, emptyParams) => + val genericTypeInfos = tpe.getGenericParameters.asScala + if (emptyParams.length != genericTypeInfos.length) { + throw new RuntimeException("Number of type parameters does not match.") + } + val typeParams = genericTypeInfos.map(typeForTypeInfo) + // TODO: remove, added only for migration of the line below, as suggested by the compiler + import compat._ + TypeRef(prefix, sym, typeParams.toList) + } + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryPredicate.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryPredicate.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryPredicate.scala new file mode 100644 index 0000000..9c9faa4 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryPredicate.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.codegen + +import org.apache.flink.api.table.tree.Expression +import org.apache.flink.api.common.typeutils.CompositeType +import org.slf4j.LoggerFactory + +/** + * Code generator for binary predicates, i.e. a Join or CoGroup Predicate. + */ +class GenerateBinaryPredicate[L, R]( + leftType: CompositeType[L], + rightType: CompositeType[R], + predicate: Expression, + cl: ClassLoader) + extends ExpressionCodeGenerator[(L, R) => Boolean]( + Seq(("input0", leftType), ("input1", rightType)), + cl = cl) { + + val LOG = LoggerFactory.getLogger(this.getClass) + + import scala.reflect.runtime.{universe => ru} + import scala.reflect.runtime.universe._ + + override protected def generateInternal(): ((L, R) => Boolean) = { + val pred = generateExpression(predicate) + + val in0 = newTermName("input0") + val in1 = newTermName("input1") + + val leftTpe = typeTermForTypeInfo(leftType) + val rightTpe = typeTermForTypeInfo(rightType) + + val code = if (nullCheck) { + q""" + ($in0: $leftTpe, $in1: $rightTpe) => { + ..${pred.code} + if (${pred.nullTerm}) { + false + } else { + ${pred.resultTerm} + } + } + """ + } else { + q""" + ($in0: $leftTpe, $in1: $rightTpe) => { + ..${pred.code} + ${pred.resultTerm} + } + """ + } + + LOG.debug(s"""Generated binary predicate "$predicate":\n$code""") + toolBox.eval(code).asInstanceOf[(L, R) => Boolean] + } +}