http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala deleted file mode 100644 index a37c892..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaStreamingTranslator.scala +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.table - -import java.lang.reflect.Modifier -import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.plan._ -import org.apache.flink.api.table.runtime.{ExpressionFilterFunction, ExpressionSelectFunction} -import org.apache.flink.api.table.expressions._ -import org.apache.flink.api.table.typeinfo.RowTypeInfo -import org.apache.flink.api.table.{ExpressionException, Row, Table} -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.streaming.api.operators.StreamMap - -/** - * [[PlanTranslator]] for creating [[Table]]s from Java [[DataStream]]s and - * translating them back to Java [[DataStream]]s. - * - * This is very limited right now. Only select and filter are implemented. Also, the expression - * operations must be extended to allow windowing operations. - */ - -class JavaStreamingTranslator extends PlanTranslator { - - type Representation[A] = DataStream[A] - - override def createTable[A]( - repr: Representation[A], - inputType: CompositeType[A], - expressions: Array[Expression], - resultFields: Seq[(String, TypeInformation[_])]): Table = { - - val rowDataStream = createSelect(expressions, repr, inputType) - - new Table(Root(rowDataStream, resultFields)) - } - - override def translate[A](op: PlanNode)(implicit tpe: TypeInformation[A]): DataStream[A] = { - - if (tpe.getTypeClass == classOf[Row]) { - // shortcut for DataSet[Row] - return translateInternal(op).asInstanceOf[DataStream[A]] - } - - val clazz = tpe.getTypeClass - if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) { - throw new ExpressionException("Cannot create DataStream of type " + - clazz.getName + ". Only top-level classes or static member classes are supported.") - } - - if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) { - throw new ExpressionException( - "A Table can only be converted to composite types, type is: " + - implicitly[TypeInformation[A]] + - ". Composite types would be tuples, case classes and POJOs.") - - } - - val resultSet = translateInternal(op) - - val resultType = resultSet.getType.asInstanceOf[RowTypeInfo] - - val outputType = implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]] - - val resultNames = resultType.getFieldNames - val outputNames = outputType.getFieldNames.toSeq - - if (resultNames.toSet != outputNames.toSet) { - throw new ExpressionException(s"Expression result type $resultType does not have the same" + - s"fields as output type $outputType") - } - - for (f <- outputNames) { - val in = resultType.getTypeAt(resultType.getFieldIndex(f)) - val out = outputType.getTypeAt(outputType.getFieldIndex(f)) - if (!in.equals(out)) { - throw new ExpressionException(s"Types for field $f differ on input $resultType and " + - s"output $outputType.") - } - } - - val outputFields = outputNames map { - f => ResolvedFieldReference(f, resultType.getTypeAt(f)) - } - - val function = new ExpressionSelectFunction( - resultSet.getType.asInstanceOf[RowTypeInfo], - outputType, - outputFields) - - val opName = s"select(${outputFields.mkString(",")})" - - resultSet.transform(opName, outputType, new StreamMap[Row, A](function)) - } - - private def translateInternal(op: PlanNode): DataStream[Row] = { - op match { - case Root(dataSet: DataStream[Row], resultFields) => - dataSet - - case Root(_, _) => - throw new ExpressionException("Invalid Root for JavaStreamingTranslator: " + op + ". " + - "Did you try converting a Table based on a DataSet to a DataStream or vice-versa?") - - case GroupBy(_, fields) => - throw new ExpressionException("Dangling GroupBy operation. Did you forget a " + - "SELECT statement?") - - case As(input, newNames) => - throw new ExpressionException("As operation for Streams not yet implemented.") - - case sel@Select(Filter(Join(leftInput, rightInput), predicate), selection) => - - val expandedInput = ExpandAggregations(sel) - - if (expandedInput.eq(sel)) { - val translatedLeftInput = translateInternal(leftInput) - val translatedRightInput = translateInternal(rightInput) - val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] - val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] - - createJoin( - predicate, - selection, - translatedLeftInput, - translatedRightInput, - leftInType, - rightInType, - JoinHint.OPTIMIZER_CHOOSES) - } else { - translateInternal(expandedInput) - } - - case Filter(Join(leftInput, rightInput), predicate) => - val translatedLeftInput = translateInternal(leftInput) - val translatedRightInput = translateInternal(rightInput) - val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] - val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] - - createJoin( - predicate, - leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) ++ - rightInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)), - translatedLeftInput, - translatedRightInput, - leftInType, - rightInType, - JoinHint.OPTIMIZER_CHOOSES) - - case Join(leftInput, rightInput) => - throw new ExpressionException("Join without filter condition encountered. " + - "Did you forget to add .where(...) ?") - - case sel@Select(input, selection) => - - val expandedInput = ExpandAggregations(sel) - - if (expandedInput.eq(sel)) { - // no expansions took place - val translatedInput = translateInternal(input) - val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] - val inputFields = inType.getFieldNames - createSelect( - selection, - translatedInput, - inType) - } else { - translateInternal(expandedInput) - } - - case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) => - throw new ExpressionException("Aggregate operation for Streams not yet implemented.") - - case agg@Aggregate(input, aggregations) => - throw new ExpressionException("Aggregate operation for Streams not yet implemented.") - - case Filter(input, predicate) => - val translatedInput = translateInternal(input) - val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] - val filter = new ExpressionFilterFunction[Row](predicate, inType) - translatedInput.filter(filter) - - case UnionAll(left, right) => - val translatedLeft = translateInternal(left) - val translatedRight = translateInternal(right) - translatedLeft.union(translatedRight) - } - } - - private def createSelect[I]( - fields: Seq[Expression], - input: DataStream[I], - inputType: CompositeType[I]): DataStream[Row] = { - - fields foreach { - f => - if (f.exists(_.isInstanceOf[Aggregation])) { - throw new ExpressionException("Found aggregate in " + fields.mkString(", ") + ".") - } - - } - - val resultType = new RowTypeInfo(fields) - - val function = new ExpressionSelectFunction(inputType, resultType, fields) - - val opName = s"select(${fields.mkString(",")})" - - input.transform(opName, resultType, new StreamMap[I, Row](function)) - } - - private def createJoin[L, R]( - predicate: Expression, - fields: Seq[Expression], - leftInput: DataStream[L], - rightInput: DataStream[R], - leftType: CompositeType[L], - rightType: CompositeType[R], - joinHint: JoinHint): DataStream[Row] = { - - throw new ExpressionException("Join operation for Streams not yet implemented.") - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala deleted file mode 100644 index 5614031..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.table - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.DataSet -import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.api.table.Table -import org.apache.flink.streaming.api.datastream.DataStream - -/** - * Environment for working with the Table API. - * - * This can be used to convert [[DataSet]] or [[DataStream]] to a [[Table]] and back again. You - * can also use the provided methods to create a [[Table]] directly from a data source. - */ -class TableEnvironment { - - /** - * Transforms the given DataSet to a [[org.apache.flink.api.table.Table]]. - * The fields of the DataSet type are renamed to the given set of fields: - * - * Example: - * - * {{{ - * tableEnv.fromDataSet(set, "a, b") - * }}} - * - * This will transform the set containing elements of two fields to a table where the fields - * are named a and b. - */ - def fromDataSet[T](set: DataSet[T], fields: String): Table = { - new JavaBatchTranslator().createTable(set, fields) - } - - /** - * Transforms the given DataSet to a [[org.apache.flink.api.table.Table]]. - * The fields of the DataSet type are used to name the - * [[org.apache.flink.api.table.Table]] fields. - */ - def fromDataSet[T](set: DataSet[T]): Table = { - new JavaBatchTranslator().createTable(set) - } - - /** - * Transforms the given DataStream to a [[org.apache.flink.api.table.Table]]. - * The fields of the DataStream type are renamed to the given set of fields: - * - * Example: - * - * {{{ - * tableEnv.fromDataStream(set, "a, b") - * }}} - * - * This will transform the set containing elements of two fields to a table where the fields - * are named a and b. - */ - def fromDataStream[T](set: DataStream[T], fields: String): Table = { - new JavaStreamingTranslator().createTable(set, fields) - } - - /** - * Transforms the given DataStream to a [[org.apache.flink.api.table.Table]]. - * The fields of the DataStream type are used to name the - * [[org.apache.flink.api.table.Table]] fields. - */ - def fromDataStream[T](set: DataStream[T]): Table = { - new JavaStreamingTranslator().createTable(set) - } - - /** - * Converts the given [[org.apache.flink.api.table.Table]] to - * a DataSet. The given type must have exactly the same fields as the - * [[org.apache.flink.api.table.Table]]. That is, the names of the - * fields and the types must match. - */ - @SuppressWarnings(Array("unchecked")) - def toDataSet[T](table: Table, clazz: Class[T]): DataSet[T] = { - new JavaBatchTranslator().translate[T](table.operation)( - TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]]) - } - - /** - * Converts the given [[org.apache.flink.api.table.Table]] to - * a DataStream. The given type must have exactly the same fields as the - * [[org.apache.flink.api.table.Table]]. That is, the names of the - * fields and the types must match. - */ - @SuppressWarnings(Array("unchecked")) - def toDataStream[T](table: Table, clazz: Class[T]): DataStream[T] = { - new JavaStreamingTranslator().translate[T](table.operation)( - TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]]) - - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala deleted file mode 100644 index 2508a3d..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.scala.table - -import org.apache.flink.api.table._ -import org.apache.flink.api.table.expressions.{UnresolvedFieldReference, Expression} -import org.apache.flink.api.common.typeutils.CompositeType - -import org.apache.flink.api.scala._ - -/** - * Methods for converting a [[DataSet]] to a [[Table]]. A [[DataSet]] is - * wrapped in this by the implicit conversions in [[org.apache.flink.api.scala.table]]. - */ -class DataSetConversions[T](set: DataSet[T], inputType: CompositeType[T]) { - - /** - * Converts the [[DataSet]] to a [[Table]]. The field names can be specified like this: - * - * {{{ - * val in: DataSet[(String, Int)] = ... - * val table = in.as('a, 'b) - * }}} - * - * This results in a [[Table]] that has field `a` of type `String` and field `b` - * of type `Int`. - */ - def as(fields: Expression*): Table = { - new ScalaBatchTranslator().createTable(set, fields.toArray) - } - - /** - * Converts the [[DataSet]] to a [[Table]]. The field names will be taken from the field names - * of the input type. - * - * Example: - * - * {{{ - * val in: DataSet[(String, Int)] = ... - * val table = in.toTable - * }}} - * - * Here, the result is a [[Table]] that has field `_1` of type `String` and field `_2` - * of type `Int`. - */ - def toTable: Table = { - val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference) - as(resultFields: _*) - } - -} - http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala deleted file mode 100644 index 47bd100..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataStreamConversions.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.table - -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table._ -import org.apache.flink.api.table.expressions.{Expression, UnresolvedFieldReference} -import org.apache.flink.streaming.api.scala.DataStream - -class DataStreamConversions[T](stream: DataStream[T], inputType: CompositeType[T]) { - - /** - * Converts the [[DataStream]] to a [[Table]]. The field names can be specified like this: - * - * {{{ - * val in: DataSet[(String, Int)] = ... - * val table = in.as('a, 'b) - * }}} - * - * This results in a [[Table]] that has field `a` of type `String` and field `b` - * of type `Int`. - */ - - def as(fields: Expression*): Table = { - new ScalaStreamingTranslator().createTable( - stream, - fields.toArray, - checkDeterministicFields = true) - } - - /** - * Converts the [[DataStream]] to a [[Table]]. The field names will be taken from the field - * names of the input type. - * - * Example: - * - * {{{ - * val in: DataSet[(String, Int)] = ... - * val table = in.toTable - * }}} - * - * This results in a [[Table]] that has field `_1` of type `String` and field `_2` - * of type `Int`. - */ - - def toTable: Table = { - val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference) - as(resultFields: _*) - } - -} - http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala deleted file mode 100644 index cdcf53e..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaBatchTranslator.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.table - - -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.table.JavaBatchTranslator -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.scala.wrap -import org.apache.flink.api.table.plan._ -import org.apache.flink.api.table.Table -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala.DataSet - -import scala.reflect.ClassTag - - -/** - * [[PlanTranslator]] for creating [[Table]]s from Scala [[DataSet]]s and - * translating them back to Scala [[DataSet]]s. - */ -class ScalaBatchTranslator extends PlanTranslator { - - private val javaTranslator = new JavaBatchTranslator - - type Representation[A] = DataSet[A] - - def createTable[A]( - repr: DataSet[A], - fields: Array[Expression]): Table = { - - val result = javaTranslator.createTable(repr.javaSet, fields) - - new Table(result.operation) - } - - override def translate[O](op: PlanNode)(implicit tpe: TypeInformation[O]): DataSet[O] = { - // fake it till you make it ... - wrap(javaTranslator.translate(op))(ClassTag.AnyRef.asInstanceOf[ClassTag[O]]) - } - - override def createTable[A]( - repr: Representation[A], - inputType: CompositeType[A], - expressions: Array[Expression], - resultFields: Seq[(String, TypeInformation[_])]): Table = { - - val result = javaTranslator.createTable(repr.javaSet, inputType, expressions, resultFields) - - Table(result.operation) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala deleted file mode 100644 index 88f1b83..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.table - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.table.JavaStreamingTranslator -import org.apache.flink.api.table.Table -import org.apache.flink.api.table.plan._ -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.streaming.api.scala.{DataStream, javaToScalaStream} - -/** - * [[PlanTranslator]] for creating [[Table]]s from Scala [[DataStream]]s and - * translating them back to Scala [[DataStream]]s. - * - * This is very limited right now. Only select and filter are implemented. Also, the expression - * operations must be extended to allow windowing operations. - */ -class ScalaStreamingTranslator extends PlanTranslator { - - private val javaTranslator = new JavaStreamingTranslator - - override type Representation[A] = DataStream[A] - - override def translate[O](op: PlanNode)(implicit tpe: TypeInformation[O]): DataStream[O] = { - // fake it till you make it ... - javaToScalaStream(javaTranslator.translate(op)) - } - - override def createTable[A]( - repr: Representation[A], - inputType: CompositeType[A], - expressions: Array[Expression], - resultFields: Seq[(String, TypeInformation[_])]): Table = { - - val result = - javaTranslator.createTable(repr.getJavaStream, inputType, expressions, resultFields) - - new Table(result.operation) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala deleted file mode 100644 index 4f2172e..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.table - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala._ -import org.apache.flink.api.table._ - -import org.apache.flink.streaming.api.scala.DataStream - -/** - * Methods for converting a [[Table]] to a [[DataSet]] or [[DataStream]]. A [[Table]] is - * wrapped in this by the implicit conversions in [[org.apache.flink.api.scala.table]]. - */ -class TableConversions(table: Table) { - - /** - * Converts the [[Table]] to a [[DataSet]]. - */ - def toDataSet[T: TypeInformation]: DataSet[T] = { - new ScalaBatchTranslator().translate[T](table.operation) - } - - /** - * Converts the [[Table]] to a [[DataStream]]. - */ - def toDataStream[T: TypeInformation]: DataStream[T] = { - new ScalaStreamingTranslator().translate[T](table.operation) - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala deleted file mode 100644 index 0be6be2..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.scala.table - -import org.apache.flink.api.table.expressions._ -import org.apache.flink.api.common.typeinfo.TypeInformation - -import scala.language.implicitConversions - -/** - * These are all the operations that can be used to construct an [[Expression]] AST for expression - * operations. - * - * These operations must be kept in sync with the parser in - * [[org.apache.flink.api.table.parser.ExpressionParser]]. - */ -trait ImplicitExpressionOperations { - def expr: Expression - - def && (other: Expression) = And(expr, other) - def || (other: Expression) = Or(expr, other) - - def > (other: Expression) = GreaterThan(expr, other) - def >= (other: Expression) = GreaterThanOrEqual(expr, other) - def < (other: Expression) = LessThan(expr, other) - def <= (other: Expression) = LessThanOrEqual(expr, other) - - def === (other: Expression) = EqualTo(expr, other) - def !== (other: Expression) = NotEqualTo(expr, other) - - def unary_! = Not(expr) - def unary_- = UnaryMinus(expr) - - def isNull = IsNull(expr) - def isNotNull = IsNotNull(expr) - - def + (other: Expression) = Plus(expr, other) - def - (other: Expression) = Minus(expr, other) - def / (other: Expression) = Div(expr, other) - def * (other: Expression) = Mul(expr, other) - def % (other: Expression) = Mod(expr, other) - - def & (other: Expression) = BitwiseAnd(expr, other) - def | (other: Expression) = BitwiseOr(expr, other) - def ^ (other: Expression) = BitwiseXor(expr, other) - def unary_~ = BitwiseNot(expr) - - def abs = Abs(expr) - - def sum = Sum(expr) - def min = Min(expr) - def max = Max(expr) - def count = Count(expr) - def avg = Avg(expr) - - def substring(beginIndex: Expression, endIndex: Expression = Literal(Int.MaxValue)) = { - Substring(expr, beginIndex, endIndex) - } - - def cast(toType: TypeInformation[_]) = Cast(expr, toType) - - def as(name: Symbol) = Naming(expr, name.name) -} - -/** - * Implicit conversions from Scala Literals to Expression [[Literal]] and from [[Expression]] - * to [[ImplicitExpressionOperations]]. - */ -trait ImplicitExpressionConversions { - implicit class WithOperations(e: Expression) extends ImplicitExpressionOperations { - def expr = e - } - - implicit class SymbolExpression(s: Symbol) extends ImplicitExpressionOperations { - def expr = UnresolvedFieldReference(s.name) - } - - implicit class LiteralLongExpression(l: Long) extends ImplicitExpressionOperations { - def expr = Literal(l) - } - - implicit class LiteralIntExpression(i: Int) extends ImplicitExpressionOperations { - def expr = Literal(i) - } - - implicit class LiteralFloatExpression(f: Float) extends ImplicitExpressionOperations { - def expr = Literal(f) - } - - implicit class LiteralDoubleExpression(d: Double) extends ImplicitExpressionOperations { - def expr = Literal(d) - } - - implicit class LiteralStringExpression(str: String) extends ImplicitExpressionOperations { - def expr = Literal(str) - } - - implicit class LiteralBooleanExpression(bool: Boolean) extends ImplicitExpressionOperations { - def expr = Literal(bool) - } - - implicit def symbol2FieldExpression(sym: Symbol): Expression = UnresolvedFieldReference(sym.name) - implicit def int2Literal(i: Int): Expression = Literal(i) - implicit def long2Literal(l: Long): Expression = Literal(l) - implicit def double2Literal(d: Double): Expression = Literal(d) - implicit def float2Literal(d: Float): Expression = Literal(d) - implicit def string2Literal(str: String): Expression = Literal(str) - implicit def boolean2Literal(bool: Boolean): Expression = Literal(bool) -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala deleted file mode 100644 index e74651b..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.scala - -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.{Row, Table} -import org.apache.flink.streaming.api.scala.DataStream - -import scala.language.implicitConversions - -/** - * == Table API (Scala) == - * - * Importing this package with: - * - * {{{ - * import org.apache.flink.api.scala.table._ - * }}} - * - * imports implicit conversions for converting a [[DataSet]] or [[DataStream]] to a - * [[Table]]. This can be used to perform SQL-like queries on data. Please have - * a look at [[Table]] to see which operations are supported and - * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]] to see how an - * expression can be specified. - * - * When writing a query you can use Scala Symbols to refer to field names. One would - * refer to field `a` by writing `'a`. Sometimes it is necessary to manually confert a - * Scala literal to an Expression Literal, in those cases use `Literal`, as in `Literal(3)`. - * - * Example: - * - * {{{ - * import org.apache.flink.api.scala._ - * import org.apache.flink.api.scala.table._ - * - * val env = ExecutionEnvironment.getExecutionEnvironment - * val input = env.fromElements(("Hello", 2), ("Hello", 5), ("Ciao", 3)) - * val result = input.as('word, 'count).groupBy('word).select('word, 'count.avg) - * result.print() - * - * env.execute() - * }}} - * - * A [[Table]] can be converted back to the underlying API - * representation using `as`: - * - * {{{ - * case class Word(word: String, count: Int) - * - * val result = in.select(...).as('word, 'count) - * val set = result.as[Word] - * }}} - */ -package object table extends ImplicitExpressionConversions { - - implicit def table2TableConversions(table: Table): TableConversions = { - new TableConversions(table) - } - - implicit def dataSet2DataSetConversions[T](set: DataSet[T]): DataSetConversions[T] = { - new DataSetConversions[T](set, set.getType.asInstanceOf[CompositeType[T]]) - } - - implicit def table2RowDataSet( - table: Table): DataSet[Row] = { - new ScalaBatchTranslator().translate[Row](table.operation) - } - - implicit def rowDataSet2Table( - rowDataSet: DataSet[Row]): Table = { - rowDataSet.toTable - } - - implicit def dataStream2DataSetConversions[T]( - stream: DataStream[T]): DataStreamConversions[T] = { - new DataStreamConversions[T]( - stream, - stream.getJavaStream.getType.asInstanceOf[CompositeType[T]]) - } - - implicit def table2RowDataStream( - table: Table): DataStream[Row] = { - new ScalaStreamingTranslator().translate[Row](table.operation) - } - - implicit def rowDataStream2Table( - rowDataStream: DataStream[Row]): Table = { - rowDataStream.toTable - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala deleted file mode 100644 index 51c0a4d..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/ExpressionException.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table - -/** - * Exception for all errors occurring during expression evaluation. - */ -class ExpressionException(msg: String) extends RuntimeException(msg) http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala deleted file mode 100644 index e3baab3..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Row.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table - -/** - * This is used for executing Table API operations. We use manually generated - * TypeInfo to check the field types and create serializers and comparators. - */ -class Row(arity: Int) extends Product { - - private val fields = new Array[Any](arity) - - def productArity = fields.length - - def productElement(i: Int): Any = fields(i) - - def setField(i: Int, value: Any): Unit = fields(i) = value - - def canEqual(that: Any) = false - - override def toString = fields.mkString(",") - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala deleted file mode 100644 index 641f2fa..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala +++ /dev/null @@ -1,294 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table - -import org.apache.flink.api.java.io.DiscardingOutputFormat -import org.apache.flink.api.table.explain.PlanJsonParser -import org.apache.flink.api.table.expressions.analysis.{GroupByAnalyzer, PredicateAnalyzer, SelectionAnalyzer} -import org.apache.flink.api.table.expressions.{Expression, ResolvedFieldReference, UnresolvedFieldReference} -import org.apache.flink.api.table.parser.ExpressionParser -import org.apache.flink.api.table.plan._ -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.table._ - -/** - * The abstraction for writing Table API programs. Similar to how the batch and streaming APIs - * have [[org.apache.flink.api.scala.DataSet]] and - * [[org.apache.flink.streaming.api.scala.DataStream]]. - * - * Use the methods of [[Table]] to transform data. Use - * [[org.apache.flink.api.java.table.TableEnvironment]] to convert a [[Table]] back to a DataSet - * or DataStream. - * - * When using Scala a [[Table]] can also be converted using implicit conversions. - * - * Example: - * - * {{{ - * val table = set.toTable('a, 'b) - * ... - * val table2 = ... - * val set = table2.toDataSet[MyType] - * }}} - * - * Operations such as [[join]], [[select]], [[where]] and [[groupBy]] either take arguments - * in a Scala DSL or as an expression String. Please refer to the documentation for the expression - * syntax. - */ -case class Table(private[flink] val operation: PlanNode) { - - /** - * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions - * can contain complex expressions and aggregations. - * - * Example: - * - * {{{ - * in.select('key, 'value.avg + " The average" as 'average, 'other.substring(0, 10)) - * }}} - */ - def select(fields: Expression*): Table = { - val analyzer = new SelectionAnalyzer(operation.outputFields) - val analyzedFields = fields.map(analyzer.analyze) - val fieldNames = analyzedFields map(_.name) - if (fieldNames.toSet.size != fieldNames.size) { - throw new ExpressionException(s"Resulting fields names are not unique in expression" + - s""" "${fields.mkString(", ")}".""") - } - this.copy(operation = Select(operation, analyzedFields)) - } - - /** - * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions - * can contain complex expressions and aggregations. - * - * Example: - * - * {{{ - * in.select("key, value.avg + " The average" as average, other.substring(0, 10)") - * }}} - */ - def select(fields: String): Table = { - val fieldExprs = ExpressionParser.parseExpressionList(fields) - select(fieldExprs: _*) - } - - /** - * Renames the fields of the expression result. Use this to disambiguate fields before - * joining to operations. - * - * Example: - * - * {{{ - * in.as('a, 'b) - * }}} - */ - def as(fields: Expression*): Table = { - fields forall { - f => f.isInstanceOf[UnresolvedFieldReference] - } match { - case true => - case false => throw new ExpressionException("Only field expression allowed in as().") - } - this.copy(operation = As(operation, fields.toArray map { _.name })) - } - - /** - * Renames the fields of the expression result. Use this to disambiguate fields before - * joining to operations. - * - * Example: - * - * {{{ - * in.as("a, b") - * }}} - */ - def as(fields: String): Table = { - val fieldExprs = ExpressionParser.parseExpressionList(fields) - as(fieldExprs: _*) - } - - /** - * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE - * clause. - * - * Example: - * - * {{{ - * in.filter('name === "Fred") - * }}} - */ - def filter(predicate: Expression): Table = { - val analyzer = new PredicateAnalyzer(operation.outputFields) - val analyzedPredicate = analyzer.analyze(predicate) - this.copy(operation = Filter(operation, analyzedPredicate)) - } - - /** - * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE - * clause. - * - * Example: - * - * {{{ - * in.filter("name = 'Fred'") - * }}} - */ - def filter(predicate: String): Table = { - val predicateExpr = ExpressionParser.parseExpression(predicate) - filter(predicateExpr) - } - - /** - * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE - * clause. - * - * Example: - * - * {{{ - * in.where('name === "Fred") - * }}} - */ - def where(predicate: Expression): Table = { - filter(predicate) - } - - /** - * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE - * clause. - * - * Example: - * - * {{{ - * in.where("name = 'Fred'") - * }}} - */ - def where(predicate: String): Table = { - filter(predicate) - } - - /** - * Groups the elements on some grouping keys. Use this before a selection with aggregations - * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. - * - * Example: - * - * {{{ - * in.groupBy('key).select('key, 'value.avg) - * }}} - */ - def groupBy(fields: Expression*): Table = { - val analyzer = new GroupByAnalyzer(operation.outputFields) - val analyzedFields = fields.map(analyzer.analyze) - - val illegalKeys = analyzedFields filter { - case fe: ResolvedFieldReference => false // OK - case e => true - } - - if (illegalKeys.nonEmpty) { - throw new ExpressionException("Illegal key expressions: " + illegalKeys.mkString(", ")) - } - - this.copy(operation = GroupBy(operation, analyzedFields)) - } - - /** - * Groups the elements on some grouping keys. Use this before a selection with aggregations - * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. - * - * Example: - * - * {{{ - * in.groupBy("key").select("key, value.avg") - * }}} - */ - def groupBy(fields: String): Table = { - val fieldsExpr = ExpressionParser.parseExpressionList(fields) - groupBy(fieldsExpr: _*) - } - - /** - * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined - * operations must not overlap, use [[as]] to rename fields if necessary. You can use - * where and select clauses after a join to further specify the behaviour of the join. - * - * Example: - * - * {{{ - * left.join(right).where('a === 'b && 'c > 3).select('a, 'b, 'd) - * }}} - */ - def join(right: Table): Table = { - val leftInputNames = operation.outputFields.map(_._1).toSet - val rightInputNames = right.operation.outputFields.map(_._1).toSet - if (leftInputNames.intersect(rightInputNames).nonEmpty) { - throw new ExpressionException( - "Overlapping fields names on join input, result would be ambiguous: " + - operation.outputFields.mkString(", ") + - " and " + - right.operation.outputFields.mkString(", ") ) - } - this.copy(operation = Join(operation, right.operation)) - } - - /** - * Union two[[Table]]s. Similar to an SQL UNION ALL. The fields of the two union operations - * must fully overlap. - * - * Example: - * - * {{{ - * left.unionAll(right) - * }}} - */ - def unionAll(right: Table): Table = { - val leftInputFields = operation.outputFields - val rightInputFields = right.operation.outputFields - if (!leftInputFields.equals(rightInputFields)) { - throw new ExpressionException( - "The fields names of join inputs should be fully overlapped, left inputs fields:" + - operation.outputFields.mkString(", ") + - " and right inputs fields" + - right.operation.outputFields.mkString(", ") - ) - } - this.copy(operation = UnionAll(operation, right.operation)) - } - - /** - * Get the process of the sql parsing, print AST and physical execution plan.The AST - * show the structure of the supplied statement. The execution plan shows how the table - * referenced by the statement will be scanned. - */ - def explain(extended: Boolean): String = { - val ast = operation - val dataSet = this.toDataSet[Row] - val env = dataSet.getExecutionEnvironment - dataSet.output(new DiscardingOutputFormat[Row]) - val jasonSqlPlan = env.getExecutionPlan() - val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended) - val result = "== Abstract Syntax Tree ==\n" + ast + "\n\n" + "== Physical Execution Plan ==" + - "\n" + sqlPlan - return result - } - - def explain(): String = explain(false) - - override def toString: String = s"Expression($operation)" -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala deleted file mode 100644 index ffa2bec..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table - -import java.util.TimeZone - -/** - * A config to define the runtime behavior of the Table API. - */ -class TableConfig extends Serializable { - - /** - * Defines the timezone for date/time/timestamp conversions. - */ - private var timeZone: TimeZone = TimeZone.getTimeZone("UTC") - - /** - * Defines if all fields need to be checked for NULL first. - */ - private var nullCheck: Boolean = false - - /** - * Sets the timezone for date/time/timestamp conversions. - */ - def setTimeZone(timeZone: TimeZone) = { - require(timeZone != null, "timeZone must not be null.") - this.timeZone = timeZone - } - - /** - * Returns the timezone for date/time/timestamp conversions. - */ - def getTimeZone = timeZone - - /** - * Returns the NULL check. If enabled, all fields need to be checked for NULL first. - */ - def getNullCheck = nullCheck - - /** - * Sets the NULL check. If enabled, all fields need to be checked for NULL first. - */ - def setNullCheck(nullCheck: Boolean) = { - this.nullCheck = nullCheck - } - -} - -object TableConfig { - val DEFAULT = new TableConfig() -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala deleted file mode 100644 index a03ba61..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala +++ /dev/null @@ -1,829 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.codegen - -import java.util.Date -import java.util.concurrent.atomic.AtomicInteger - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} -import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.api.table.expressions._ -import org.apache.flink.api.table.typeinfo.{RenamingProxyTypeInfo, RowTypeInfo} -import org.apache.flink.api.table.{ExpressionException, TableConfig, expressions} -import org.codehaus.janino.SimpleCompiler -import org.slf4j.LoggerFactory - -import scala.collection.mutable - -/** Base class for all code generation classes. This provides the functionality for generating - * code from an [[Expression]] tree. Derived classes must embed this in a lambda function - * to form an executable code block. - * - * @param inputs List of input variable names with corresponding [[TypeInformation]]. - * @param cl The ClassLoader that is used to create the Scala reflection ToolBox - * @param config General configuration specifying runtime behaviour. - * @tparam R The type of the generated code block. In most cases a lambda function such - * as "(IN1, IN2) => OUT". - */ -abstract class ExpressionCodeGenerator[R]( - inputs: Seq[(String, CompositeType[_])], - cl: ClassLoader, - config: TableConfig) { - protected val log = LoggerFactory.getLogger(classOf[ExpressionCodeGenerator[_]]) - - import scala.reflect.runtime.universe._ - import scala.reflect.runtime.{universe => ru} - - if (cl == null) { - throw new IllegalArgumentException("ClassLoader must not be null.") - } - - val compiler = new SimpleCompiler() - compiler.setParentClassLoader(cl) - - protected val reusableMemberStatements = mutable.Set[String]() - - protected val reusableInitStatements = mutable.Set[String]() - - protected def reuseMemberCode(): String = { - reusableMemberStatements.mkString("", "\n", "\n") - } - - protected def reuseInitCode(): String = { - reusableInitStatements.mkString("", "\n", "\n") - } - - protected def nullCheck: Boolean = config.getNullCheck - - // This is to be implemented by subclasses, we have it like this - // so that we only call it from here with the Scala Reflection Lock. - protected def generateInternal(): R - - final def generate(): R = { - generateInternal() - } - - protected def generateExpression(expr: Expression): GeneratedExpression = { - generateExpressionInternal(expr) - } - - protected def generateExpressionInternal(expr: Expression): GeneratedExpression = { - // protected def generateExpression(expr: Expression): GeneratedExpression = { - val nullTerm = freshName("isNull") - val resultTerm = freshName("result") - - // For binary predicates that must only be evaluated when both operands are non-null. - // This will write to nullTerm and resultTerm, so don't use those term names - // after using this function - def generateIfNonNull(left: Expression, right: Expression, resultType: TypeInformation[_]) - (expr: (String, String) => String): String = { - val leftCode = generateExpression(left) - val rightCode = generateExpression(right) - - val leftTpe = typeTermForTypeInfo(left.typeInfo) - val rightTpe = typeTermForTypeInfo(right.typeInfo) - val resultTpe = typeTermForTypeInfo(resultType) - - if (nullCheck) { - leftCode.code + "\n" + - rightCode.code + "\n" + - s""" - |boolean $nullTerm = ${leftCode.nullTerm} || ${rightCode.nullTerm}; - |$resultTpe $resultTerm; - |if ($nullTerm) { - | $resultTerm = ${defaultPrimitive(resultType)}; - |} else { - | $resultTerm = ${expr(leftCode.resultTerm, rightCode.resultTerm)}; - |} - """.stripMargin - } else { - leftCode.code + "\n" + - rightCode.code + "\n" + - s""" - |$resultTpe $resultTerm = ${expr(leftCode.resultTerm, rightCode.resultTerm)}; - """.stripMargin - } - } - - def cleanedExpr(e: Expression): Expression = { - e match { - case expressions.Naming(namedExpr, _) => cleanedExpr(namedExpr) - case _ => e - } - } - - val cleanedExpression = cleanedExpr(expr) - val resultTpe = typeTermForTypeInfo(cleanedExpression.typeInfo) - - val code: String = cleanedExpression match { - - case expressions.Literal(null, typeInfo) => - if (nullCheck) { - s""" - |boolean $nullTerm = true; - |$resultTpe resultTerm = null; - """.stripMargin - } else { - s""" - |$resultTpe resultTerm = null; - """.stripMargin - } - - case expressions.Literal(intValue: Int, INT_TYPE_INFO) => - if (nullCheck) { - s""" - |boolean $nullTerm = false; - |$resultTpe $resultTerm = $intValue; - """.stripMargin - } else { - s""" - |$resultTpe $resultTerm = $intValue; - """.stripMargin - } - - case expressions.Literal(longValue: Long, LONG_TYPE_INFO) => - if (nullCheck) { - s""" - |boolean $nullTerm = false; - |$resultTpe $resultTerm = ${longValue}L; - """.stripMargin - } else { - s""" - |$resultTpe $resultTerm = ${longValue}L; - """.stripMargin - } - - - case expressions.Literal(doubleValue: Double, DOUBLE_TYPE_INFO) => - if (nullCheck) { - s""" - |boolean $nullTerm = false; - |$resultTpe $resultTerm = $doubleValue; - """.stripMargin - } else { - s""" - |$resultTpe $resultTerm = $doubleValue; - """.stripMargin - } - - case expressions.Literal(floatValue: Float, FLOAT_TYPE_INFO) => - if (nullCheck) { - s""" - |boolean $nullTerm = false; - |$resultTpe $resultTerm = ${floatValue}f; - """.stripMargin - } else { - s""" - |$resultTpe $resultTerm = ${floatValue}f; - """.stripMargin - } - - case expressions.Literal(strValue: String, STRING_TYPE_INFO) => - if (nullCheck) { - s""" - |boolean $nullTerm = false; - |$resultTpe $resultTerm = "$strValue"; - """.stripMargin - } else { - s""" - |$resultTpe $resultTerm = "$strValue"; - """.stripMargin - } - - case expressions.Literal(boolValue: Boolean, BOOLEAN_TYPE_INFO) => - if (nullCheck) { - s""" - |boolean $nullTerm = false; - |$resultTpe $resultTerm = $boolValue; - """.stripMargin - } else { - s""" - $resultTpe $resultTerm = $boolValue; - """.stripMargin - } - - case expressions.Literal(dateValue: Date, DATE_TYPE_INFO) => - val dateName = s"""date_${dateValue.getTime}""" - val dateStmt = s"""static java.util.Date $dateName - |= new java.util.Date(${dateValue.getTime});""".stripMargin - reusableMemberStatements.add(dateStmt) - - if (nullCheck) { - s""" - |boolean $nullTerm = false; - |$resultTpe $resultTerm = $dateName; - """.stripMargin - } else { - s""" - |$resultTpe $resultTerm = $dateName; - """.stripMargin - } - - case Substring(str, beginIndex, endIndex) => - val strCode = generateExpression(str) - val beginIndexCode = generateExpression(beginIndex) - val endIndexCode = generateExpression(endIndex) - if (nullCheck) { - strCode.code + - beginIndexCode.code + - endIndexCode.code + - s""" - boolean $nullTerm = - ${strCode.nullTerm} || ${beginIndexCode.nullTerm} || ${endIndexCode.nullTerm}; - $resultTpe $resultTerm; - if ($nullTerm) { - $resultTerm = ${defaultPrimitive(str.typeInfo)}; - } else { - if (${endIndexCode.resultTerm} == Int.MaxValue) { - $resultTerm = (${strCode.resultTerm}).substring(${beginIndexCode.resultTerm}); - } else { - $resultTerm = (${strCode.resultTerm}).substring( - ${beginIndexCode.resultTerm}, - ${endIndexCode.resultTerm}); - } - } - """.stripMargin - } else { - strCode.code + - beginIndexCode.code + - endIndexCode.code + - s""" - $resultTpe $resultTerm; - - if (${endIndexCode.resultTerm} == Integer.MAX_VALUE) { - $resultTerm = (${strCode.resultTerm}).substring(${beginIndexCode.resultTerm}); - } else { - $resultTerm = (${strCode.resultTerm}).substring( - ${beginIndexCode.resultTerm}, - ${endIndexCode.resultTerm}); - } - """ - } - - case expressions.Cast(child: Expression, STRING_TYPE_INFO) - if child.typeInfo == BasicTypeInfo.DATE_TYPE_INFO => - val childGen = generateExpression(child) - - addTimestampFormatter() - - val castCode = if (nullCheck) { - s""" - |boolean $nullTerm = ${childGen.nullTerm}; - |$resultTpe $resultTerm; - |if ($nullTerm) { - | $resultTerm = null; - |} else { - | $resultTerm = timestampFormatter.format(${childGen.resultTerm}); - |} - """.stripMargin - } else { - s""" - |$resultTpe $resultTerm = timestampFormatter.format(${childGen.resultTerm}); - """.stripMargin - } - childGen.code + castCode - - case expressions.Cast(child: Expression, STRING_TYPE_INFO) => - val childGen = generateExpression(child) - val castCode = if (nullCheck) { - s""" - |boolean $nullTerm = ${childGen.nullTerm}; - |$resultTpe $resultTerm; - |if ($nullTerm) { - | $resultTerm = null; - |} else { - | $resultTerm = "" + ${childGen.resultTerm}; - |} - """.stripMargin - } else { - s""" - |$resultTpe $resultTerm = "" + ${childGen.resultTerm}; - """.stripMargin - } - childGen.code + castCode - - case expressions.Cast(child: Expression, DATE_TYPE_INFO) - if child.typeInfo == BasicTypeInfo.LONG_TYPE_INFO => - val childGen = generateExpression(child) - val castCode = if (nullCheck) { - s""" - |boolean $nullTerm = ${childGen.nullTerm}; - |$resultTpe $resultTerm; - |if ($nullTerm) { - | $resultTerm = null; - |} else { - | $resultTerm = new java.util.Date(${childGen.resultTerm}); - |} - """.stripMargin - } else { - s""" - |$resultTpe $resultTerm = new java.util.Date(${childGen.resultTerm}); - """.stripMargin - } - childGen.code + castCode - - case expressions.Cast(child: Expression, DATE_TYPE_INFO) - if child.typeInfo == BasicTypeInfo.STRING_TYPE_INFO => - val childGen = generateExpression(child) - - addDateFormatter() - addTimeFormatter() - addTimestampFormatter() - - // tries to parse - // "2011-05-03 15:51:36.234" - // then "2011-05-03" - // then "15:51:36" - // then "1446473775" - val parsedName = freshName("parsed") - val parsingCode = - s""" - |java.util.Date $parsedName = null; - |try { - | $parsedName = timestampFormatter.parse(${childGen.resultTerm}); - |} catch (java.text.ParseException e1) { - | try { - | $parsedName = dateFormatter.parse(${childGen.resultTerm}); - | } catch (java.text.ParseException e2) { - | try { - | $parsedName = timeFormatter.parse(${childGen.resultTerm}); - | } catch (java.text.ParseException e3) { - | $parsedName = new java.util.Date(Long.valueOf(${childGen.resultTerm})); - | } - | } - |} - """.stripMargin - - val castCode = if (nullCheck) { - s""" - |boolean $nullTerm = ${childGen.nullTerm}; - |$resultTpe $resultTerm; - |if ($nullTerm) { - | $resultTerm = null; - |} else { - | $parsingCode - | $resultTerm = $parsedName; - |} - """.stripMargin - } else { - s""" - |$parsingCode - |$resultTpe $resultTerm = $parsedName; - """.stripMargin - } - childGen.code + castCode - - case expressions.Cast(child: Expression, DATE_TYPE_INFO) => - throw new ExpressionException("Only Long and String can be casted to Date.") - - case expressions.Cast(child: Expression, LONG_TYPE_INFO) - if child.typeInfo == BasicTypeInfo.DATE_TYPE_INFO => - val childGen = generateExpression(child) - val castCode = if (nullCheck) { - s""" - |boolean $nullTerm = ${childGen.nullTerm}; - |$resultTpe $resultTerm; - |if ($nullTerm) { - | $resultTerm = null; - |} else { - | $resultTerm = ${childGen.resultTerm}.getTime(); - |} - """.stripMargin - } else { - s""" - |$resultTpe $resultTerm = ${childGen.resultTerm}.getTime(); - """.stripMargin - } - childGen.code + castCode - - case expressions.Cast(child: Expression, tpe: BasicTypeInfo[_]) - if child.typeInfo == BasicTypeInfo.DATE_TYPE_INFO => - throw new ExpressionException("Date can only be casted to Long or String.") - - case expressions.Cast(child: Expression, tpe: BasicTypeInfo[_]) - if child.typeInfo == BasicTypeInfo.STRING_TYPE_INFO => - val childGen = generateExpression(child) - val fromTpe = typeTermForTypeInfoForCast(child.typeInfo) - val toTpe = typeTermForTypeInfoForCast(tpe) - - val castCode = if (nullCheck) { - s""" - |boolean $nullTerm = ${childGen.nullTerm}; - |$resultTpe $resultTerm = - | ${tpe.getTypeClass.getCanonicalName}.valueOf(${childGen.resultTerm}); - """.stripMargin - } else { - s""" - |$resultTpe $resultTerm = - | ${tpe.getTypeClass.getCanonicalName}.valueOf(${childGen.resultTerm}); - """.stripMargin - } - - childGen.code + castCode - - case expressions.Cast(child: Expression, tpe: BasicTypeInfo[_]) - if child.typeInfo.isBasicType => - val childGen = generateExpression(child) - val fromTpe = typeTermForTypeInfoForCast(child.typeInfo) - val toTpe = typeTermForTypeInfoForCast(tpe) - val castCode = if (nullCheck) { - s""" - |boolean $nullTerm = ${childGen.nullTerm}; - |$resultTpe $resultTerm; - |if ($nullTerm) { - | $resultTerm = null; - |} else { - | $resultTerm = ($toTpe)($fromTpe) ${childGen.resultTerm}; - |} - """.stripMargin - } else { - s"$resultTpe $resultTerm = ($toTpe)($fromTpe) ${childGen.resultTerm};\n" - } - childGen.code + castCode - - case ResolvedFieldReference(fieldName, fieldTpe: TypeInformation[_]) => - inputs find { i => i._2.hasField(fieldName)} match { - case Some((inputName, inputTpe)) => - val fieldCode = getField(newTermName(inputName), inputTpe, fieldName, fieldTpe) - if (nullCheck) { - s""" - |$resultTpe $resultTerm = $fieldCode; - |boolean $nullTerm = $resultTerm == null; - """.stripMargin - } else { - s"""$resultTpe $resultTerm = $fieldCode;""" - } - - case None => throw new ExpressionException("Could not get accessor for " + fieldName - + " in inputs " + inputs.mkString(", ") + ".") - } - - case GreaterThan(left, right) => - generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { - (leftTerm, rightTerm) => s"$leftTerm > $rightTerm" - } - - case GreaterThanOrEqual(left, right) => - generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { - (leftTerm, rightTerm) => s"$leftTerm >= $rightTerm" - } - - case LessThan(left, right) => - generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { - (leftTerm, rightTerm) => s"$leftTerm < $rightTerm" - } - - case LessThanOrEqual(left, right) => - generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { - (leftTerm, rightTerm) => s"$leftTerm <= $rightTerm" - } - - case EqualTo(left, right) => - generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { - (leftTerm, rightTerm) => s"$leftTerm.equals($rightTerm)" - } - - case NotEqualTo(left, right) => - generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { - (leftTerm, rightTerm) => s"!($leftTerm.equals($rightTerm))" - } - - case And(left, right) => - generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { - (leftTerm, rightTerm) => s"$leftTerm && $rightTerm" - } - - case Or(left, right) => - generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) { - (leftTerm, rightTerm) => s"$leftTerm || $rightTerm" - } - - case Plus(left, right) => - generateIfNonNull(left, right, expr.typeInfo) { - (leftTerm, rightTerm) => s"$leftTerm + $rightTerm" - } - - case Minus(left, right) => - generateIfNonNull(left, right, expr.typeInfo) { - (leftTerm, rightTerm) => s"$leftTerm - $rightTerm" - } - - case Div(left, right) => - generateIfNonNull(left, right, expr.typeInfo) { - (leftTerm, rightTerm) => s"$leftTerm / $rightTerm" - } - - case Mul(left, right) => - generateIfNonNull(left, right, expr.typeInfo) { - (leftTerm, rightTerm) => s"$leftTerm * $rightTerm" - } - - case Mod(left, right) => - generateIfNonNull(left, right, expr.typeInfo) { - (leftTerm, rightTerm) => s"$leftTerm % $rightTerm" - } - - case UnaryMinus(child) => - val childCode = generateExpression(child) - if (nullCheck) { - childCode.code + - s""" - |boolean $nullTerm = ${childCode.nullTerm}; - |$resultTpe $resultTerm; - |if ($nullTerm) { - | $resultTerm = ${defaultPrimitive(child.typeInfo)}; - |} else { - | $resultTerm = -(${childCode.resultTerm}); - |} - """.stripMargin - } else { - childCode.code + - s""" - |$resultTpe $resultTerm = -(${childCode.resultTerm}); - """.stripMargin - } - - case BitwiseAnd(left, right) => - generateIfNonNull(left, right, expr.typeInfo) { - (leftTerm, rightTerm) => s"(int) $leftTerm & (int) $rightTerm" - } - - case BitwiseOr(left, right) => - generateIfNonNull(left, right, expr.typeInfo) { - (leftTerm, rightTerm) => s"(int) $leftTerm | (int) $rightTerm" - } - - case BitwiseXor(left, right) => - generateIfNonNull(left, right, expr.typeInfo) { - (leftTerm, rightTerm) => s"(int) $leftTerm ^ (int) $rightTerm" - } - - case BitwiseNot(child) => - val childCode = generateExpression(child) - if (nullCheck) { - childCode.code + - s""" - |boolean $nullTerm = ${childCode.nullTerm}; - |$resultTpe $resultTerm; - |if ($nullTerm) { - | $resultTerm = ${defaultPrimitive(child.typeInfo)}; - |} else { - | $resultTerm = ~((int) ${childCode.resultTerm}); - |} - """.stripMargin - } else { - childCode.code + - s""" - |$resultTpe $resultTerm = ~((int) ${childCode.resultTerm}); - """.stripMargin - } - - case Not(child) => - val childCode = generateExpression(child) - if (nullCheck) { - childCode.code + - s""" - |boolean $nullTerm = ${childCode.nullTerm}; - |$resultTpe $resultTerm; - |if ($nullTerm) { - | $resultTerm = ${defaultPrimitive(child.typeInfo)}; - |} else { - | $resultTerm = !(${childCode.resultTerm}); - |} - """.stripMargin - } else { - childCode.code + - s""" - |$resultTpe $resultTerm = !(${childCode.resultTerm}); - """.stripMargin - } - - case IsNull(child) => - val childCode = generateExpression(child) - if (nullCheck) { - childCode.code + - s""" - |$resultTpe $resultTerm = ${childCode.nullTerm}; - """.stripMargin - } else { - childCode.code + - s""" - |$resultTpe $resultTerm = (${childCode.resultTerm}) == null; - """.stripMargin - } - - case IsNotNull(child) => - val childCode = generateExpression(child) - if (nullCheck) { - childCode.code + - s""" - |$resultTpe $resultTerm = !${childCode.nullTerm}; - """.stripMargin - } else { - childCode.code + - s""" - |$resultTpe $resultTerm = (${childCode.resultTerm}) != null; - """.stripMargin - } - - case Abs(child) => - val childCode = generateExpression(child) - if (nullCheck) { - childCode.code + - s""" - |boolean $nullTerm = ${childCode.nullTerm}; - |$resultTpe $resultTerm; - |if ($nullTerm) { - | $resultTerm = ${defaultPrimitive(child.typeInfo)}; - |} else { - | $resultTerm = Math.abs(${childCode.resultTerm}); - |} - """.stripMargin - } else { - childCode.code + - s""" - |$resultTpe $resultTerm = Math.abs(${childCode.resultTerm}); - """.stripMargin - } - - case _ => throw new ExpressionException("Could not generate code for expression " + expr) - } - - GeneratedExpression(code, resultTerm, nullTerm) - } - - case class GeneratedExpression(code: String, resultTerm: String, nullTerm: String) - - def freshName(name: String): String = { - s"$name$$${freshNameCounter.getAndIncrement}" - } - - val freshNameCounter = new AtomicInteger - - protected def getField( - inputTerm: TermName, - inputType: CompositeType[_], - fieldName: String, - fieldType: TypeInformation[_]): String = { - val accessor = fieldAccessorFor(inputType, fieldName) - val fieldTpe = typeTermForTypeInfo(fieldType) - - accessor match { - case ObjectFieldAccessor(fieldName) => - val fieldTerm = newTermName(fieldName) - s"($fieldTpe) $inputTerm.$fieldTerm" - - case ObjectMethodAccessor(methodName) => - val methodTerm = newTermName(methodName) - s"($fieldTpe) $inputTerm.$methodTerm()" - - case ProductAccessor(i) => - s"($fieldTpe) $inputTerm.productElement($i)" - - } - } - - sealed abstract class FieldAccessor - - case class ObjectFieldAccessor(fieldName: String) extends FieldAccessor - - case class ObjectMethodAccessor(methodName: String) extends FieldAccessor - - case class ProductAccessor(i: Int) extends FieldAccessor - - def fieldAccessorFor(elementType: CompositeType[_], fieldName: String): FieldAccessor = { - elementType match { - case ri: RowTypeInfo => - ProductAccessor(elementType.getFieldIndex(fieldName)) - - case cc: CaseClassTypeInfo[_] => - ObjectMethodAccessor(fieldName) - - case javaTup: TupleTypeInfo[_] => - ObjectFieldAccessor(fieldName) - - case pj: PojoTypeInfo[_] => - ObjectFieldAccessor(fieldName) - - case proxy: RenamingProxyTypeInfo[_] => - val underlying = proxy.getUnderlyingType - val fieldIndex = proxy.getFieldIndex(fieldName) - fieldAccessorFor(underlying, underlying.getFieldNames()(fieldIndex)) - } - } - - protected def defaultPrimitive(tpe: TypeInformation[_]): String = tpe match { - case BasicTypeInfo.INT_TYPE_INFO => "-1" - case BasicTypeInfo.LONG_TYPE_INFO => "-1" - case BasicTypeInfo.SHORT_TYPE_INFO => "-1" - case BasicTypeInfo.BYTE_TYPE_INFO => "-1" - case BasicTypeInfo.FLOAT_TYPE_INFO => "-1.0f" - case BasicTypeInfo.DOUBLE_TYPE_INFO => "-1.0d" - case BasicTypeInfo.BOOLEAN_TYPE_INFO => "false" - case BasicTypeInfo.STRING_TYPE_INFO => "\"<empty>\"" - case BasicTypeInfo.CHAR_TYPE_INFO => "'\\0'" - case _ => "null" - } - - protected def typeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match { - - // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections - // does not seem to like this, so we manually give the correct type here. - case PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]" - case PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]" - case PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]" - case PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]" - case PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]" - case PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]" - case PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]" - case PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]" - - case _ => - tpe.getTypeClass.getCanonicalName - - } - - // when casting we first need to unbox Primitives, for example, - // float a = 1.0f; - // byte b = (byte) a; - // works, but for boxed types we need this: - // Float a = 1.0f; - // Byte b = (byte)(float) a; - protected def typeTermForTypeInfoForCast(tpe: TypeInformation[_]): String = tpe match { - - case BasicTypeInfo.INT_TYPE_INFO => "int" - case BasicTypeInfo.LONG_TYPE_INFO => "long" - case BasicTypeInfo.SHORT_TYPE_INFO => "short" - case BasicTypeInfo.BYTE_TYPE_INFO => "byte" - case BasicTypeInfo.FLOAT_TYPE_INFO => "float" - case BasicTypeInfo.DOUBLE_TYPE_INFO => "double" - case BasicTypeInfo.BOOLEAN_TYPE_INFO => "boolean" - case BasicTypeInfo.CHAR_TYPE_INFO => "char" - - // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections - // does not seem to like this, so we manually give the correct type here. - case PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]" - case PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]" - case PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]" - case PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]" - case PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]" - case PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]" - case PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]" - case PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]" - - case _ => - tpe.getTypeClass.getCanonicalName - - } - - def addDateFormatter(): Unit = { - reusableMemberStatements.add(s""" - |java.text.SimpleDateFormat dateFormatter = - | new java.text.SimpleDateFormat("yyyy-MM-dd"); - |""".stripMargin) - - reusableInitStatements.add(s""" - |dateFormatter.setTimeZone(config.getTimeZone()); - |""".stripMargin) - } - - def addTimeFormatter(): Unit = { - reusableMemberStatements.add(s""" - |java.text.SimpleDateFormat timeFormatter = - | new java.text.SimpleDateFormat("HH:mm:ss"); - |""".stripMargin) - - reusableInitStatements.add(s""" - |timeFormatter.setTimeZone(config.getTimeZone()); - |""".stripMargin) - } - - def addTimestampFormatter(): Unit = { - reusableMemberStatements.add(s""" - |java.text.SimpleDateFormat timestampFormatter = - | new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); - |""".stripMargin) - - reusableInitStatements.add(s""" - |timestampFormatter.setTimeZone(config.getTimeZone()); - |""".stripMargin) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala deleted file mode 100644 index 50b8c69..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.codegen - -import java.io.StringReader - -import org.apache.flink.api.common.functions.FilterFunction -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.TableConfig -import org.apache.flink.api.table.codegen.Indenter._ -import org.apache.flink.api.table.expressions.Expression -import org.slf4j.LoggerFactory - -/** - * Code generator for a unary predicate, i.e. a Filter. - */ -class GenerateFilter[T]( - inputType: CompositeType[T], - predicate: Expression, - cl: ClassLoader, - config: TableConfig) extends ExpressionCodeGenerator[FilterFunction[T]]( - Seq(("in0", inputType)), - cl = cl, - config) { - - val LOG = LoggerFactory.getLogger(this.getClass) - - override protected def generateInternal(): FilterFunction[T] = { - val pred = generateExpression(predicate) - - val tpe = typeTermForTypeInfo(inputType) - - val generatedName = freshName("GeneratedFilter") - - // Janino does not support generics, so we need to cast by hand - val code = if (nullCheck) { - j""" - public class $generatedName - implements org.apache.flink.api.common.functions.FilterFunction<$tpe> { - - org.apache.flink.api.table.TableConfig config = null; - - public $generatedName(org.apache.flink.api.table.TableConfig config) { - this.config = config; - } - - public boolean filter(Object _in0) { - $tpe in0 = ($tpe) _in0; - ${pred.code} - if (${pred.nullTerm}) { - return false; - } else { - return ${pred.resultTerm}; - } - } - } - """ - } else { - j""" - public class $generatedName - implements org.apache.flink.api.common.functions.FilterFunction<$tpe> { - - org.apache.flink.api.table.TableConfig config = null; - - public $generatedName(org.apache.flink.api.table.TableConfig config) { - this.config = config; - } - - public boolean filter(Object _in0) { - $tpe in0 = ($tpe) _in0; - ${pred.code} - return ${pred.resultTerm}; - } - } - """ - } - - LOG.debug(s"""Generated unary predicate "$predicate":\n$code""") - compiler.cook(new StringReader(code)) - val clazz = compiler.getClassLoader().loadClass(generatedName) - val constructor = clazz.getConstructor(classOf[TableConfig]) - constructor.newInstance(config).asInstanceOf[FilterFunction[T]] - } -}