http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala new file mode 100644 index 0000000..3b5459b --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.typeinfo + +import org.apache.flink.api.common.operators.Operator +import org.apache.flink.api.java.operators.SingleInputOperator +import org.apache.flink.api.java.{DataSet => JavaDataSet} + +/** + * This is a logical operator that can hold a [[RenamingProxyTypeInfo]] for renaming some + * fields of a [[org.apache.flink.api.common.typeutils.CompositeType]]. At runtime this + * disappears since the translation methods simply returns the input. + */ +class RenameOperator[T]( + input: JavaDataSet[T], + renamingTypeInformation: RenamingProxyTypeInfo[T]) + extends SingleInputOperator[T, T, RenameOperator[T]](input, renamingTypeInformation) { + + override protected def translateToDataFlow( + input: Operator[T]): Operator[T] = input +}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala new file mode 100644 index 0000000..dd598ab --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.typeinfo + +import java.util + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeType.{TypeComparatorBuilder, +FlatFieldDescriptor} +import org.apache.flink.api.common.typeutils.{CompositeType, TypeSerializer} + +/** + * A TypeInformation that is used to rename fields of an underlying CompositeType. This + * allows the system to translate "as" Table API operations to a [[RenameOperator]] + * that does not get translated to a runtime operator. + */ +class RenamingProxyTypeInfo[T]( + val tpe: CompositeType[T], + val fieldNames: Array[String]) + extends CompositeType[T](tpe.getTypeClass) { + + def getUnderlyingType: CompositeType[T] = tpe + + if (tpe.getArity != fieldNames.length) { + throw new IllegalArgumentException(s"Number of field names '${fieldNames.mkString(",")}' and " + + s"number of fields in underlying type $tpe do not match.") + } + + if (fieldNames.toSet.size != fieldNames.length) { + throw new IllegalArgumentException(s"New field names must be unique. " + + s"Names: ${fieldNames.mkString(",")}.") + } + + override def getFieldIndex(fieldName: String): Int = { + val result = fieldNames.indexOf(fieldName) + if (result != fieldNames.lastIndexOf(fieldName)) { + -2 + } else { + result + } + } + override def getFieldNames: Array[String] = fieldNames + + override def isBasicType: Boolean = tpe.isBasicType + + override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = + tpe.createSerializer(executionConfig) + + override def getArity: Int = tpe.getArity + + override def isKeyType: Boolean = tpe.isKeyType + + override def getTypeClass: Class[T] = tpe.getTypeClass + + override def getGenericParameters: java.util.List[TypeInformation[_]] = tpe.getGenericParameters + + override def isTupleType: Boolean = tpe.isTupleType + + override def toString = { + s"RenamingType(type: ${tpe.getTypeClass.getSimpleName}; " + + s"fields: ${fieldNames.mkString(", ")})" + } + + override def getTypeAt[X](pos: Int): TypeInformation[X] = tpe.getTypeAt(pos) + + override def getTotalFields: Int = tpe.getTotalFields + + override def createComparator( + logicalKeyFields: Array[Int], + orders: Array[Boolean], + logicalFieldOffset: Int, + executionConfig: ExecutionConfig) = + tpe.createComparator(logicalKeyFields, orders, logicalFieldOffset, executionConfig) + + override def getFlatFields( + fieldExpression: String, + offset: Int, + result: util.List[FlatFieldDescriptor]): Unit = { + tpe.getFlatFields(fieldExpression, offset, result) + } + + override def getTypeAt[X](fieldExpression: String): TypeInformation[X] = { + tpe.getTypeAt(fieldExpression) + } + + override protected def createTypeComparatorBuilder(): TypeComparatorBuilder[T] = { + throw new RuntimeException("This method should never be called because createComparator is " + + "overwritten.") + } + + override def equals(obj: Any): Boolean = { + obj match { + case renamingProxy: RenamingProxyTypeInfo[_] => + renamingProxy.canEqual(this) && + tpe.equals(renamingProxy.tpe) && + fieldNames.sameElements(renamingProxy.fieldNames) + case _ => false + } + } + + override def hashCode(): Int = { + 31 * tpe.hashCode() + util.Arrays.hashCode(fieldNames.asInstanceOf[Array[AnyRef]]) + } + + override def canEqual(obj: Any): Boolean = { + obj.isInstanceOf[RenamingProxyTypeInfo[_]] + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala new file mode 100644 index 0000000..5e9613d --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.typeinfo + +import org.apache.flink.api.table.Row +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.core.memory.{DataOutputView, DataInputView} + +/** + * Serializer for [[Row]]. + */ +class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]]) + extends TypeSerializer[Row] { + + override def isImmutableType: Boolean = false + + override def getLength: Int = -1 + + override def duplicate = this + + override def createInstance: Row = { + new Row(fieldSerializers.length) + } + + override def copy(from: Row, reuse: Row): Row = { + val len = fieldSerializers.length + + if (from.productArity != len) { + throw new RuntimeException("Row arity of reuse and from do not match.") + } + var i = 0 + while (i < len) { + val reuseField = reuse.productElement(i) + val fromField = from.productElement(i).asInstanceOf[AnyRef] + val copy = fieldSerializers(i).copy(fromField, reuseField) + reuse.setField(i, copy) + i += 1 + } + reuse + } + + override def copy(from: Row): Row = { + val len = fieldSerializers.length + + if (from.productArity != len) { + throw new RuntimeException("Row arity of reuse and from do not match.") + } + val result = new Row(len) + var i = 0 + while (i < len) { + val fromField = from.productElement(i).asInstanceOf[AnyRef] + val copy = fieldSerializers(i).copy(fromField) + result.setField(i, copy) + i += 1 + } + result + } + + override def serialize(value: Row, target: DataOutputView) { + val len = fieldSerializers.length + var i = 0 + while (i < len) { + val serializer = fieldSerializers(i) + serializer.serialize(value.productElement(i), target) + i += 1 + } + } + + override def deserialize(reuse: Row, source: DataInputView): Row = { + val len = fieldSerializers.length + + if (reuse.productArity != len) { + throw new RuntimeException("Row arity of reuse and fields do not match.") + } + + var i = 0 + while (i < len) { + val field = reuse.productElement(i).asInstanceOf[AnyRef] + reuse.setField(i, fieldSerializers(i).deserialize(field, source)) + i += 1 + } + reuse + } + + override def deserialize(source: DataInputView): Row = { + val len = fieldSerializers.length + + val result = new Row(len) + var i = 0 + while (i < len) { + result.setField(i, fieldSerializers(i).deserialize(source)) + i += 1 + } + result + } + + override def copy(source: DataInputView, target: DataOutputView): Unit = { + val len = fieldSerializers.length + var i = 0 + while (i < len) { + fieldSerializers(i).copy(source, target) + i += 1 + } + } + + override def equals(any: scala.Any): Boolean = { + any match { + case otherRS: RowSerializer => + otherRS.canEqual(this) && + fieldSerializers.sameElements(otherRS.fieldSerializers) + case _ => false + } + } + + override def canEqual(obj: scala.Any): Boolean = { + obj.isInstanceOf[RowSerializer] + } + + override def hashCode(): Int = { + java.util.Arrays.hashCode(fieldSerializers.asInstanceOf[Array[AnyRef]]) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala new file mode 100644 index 0000000..db3c881 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.typeinfo + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.table.Row +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo} + +/** + * TypeInformation for [[Row]]. + */ +class RowTypeInfo( + fieldTypes: Seq[TypeInformation[_]], + fieldNames: Seq[String]) + extends CaseClassTypeInfo[Row](classOf[Row], Array(), fieldTypes, fieldNames) { + + def this(fields: Seq[Expression]) = this(fields.map(_.typeInfo), fields.map(_.name)) + + if (fieldNames.toSet.size != fieldNames.size) { + throw new IllegalArgumentException("Field names must be unique.") + } + + override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[Row] = { + val fieldSerializers: Array[TypeSerializer[Any]] = new Array[TypeSerializer[Any]](getArity) + for (i <- 0 until getArity) { + fieldSerializers(i) = this.types(i).createSerializer(executionConfig) + .asInstanceOf[TypeSerializer[Any]] + } + + new RowSerializer(fieldSerializers) + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala new file mode 100644 index 0000000..dda6265 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.scala + +import org.apache.flink.api.table.expressions.Literal +import org.apache.flink.api.common.functions.GroupReduceFunction +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.examples.java.graph.util.PageRankData +import org.apache.flink.util.Collector + +import _root_.scala.collection.JavaConverters._ + +/** +* A basic implementation of the Page Rank algorithm using a bulk iteration. +* +* This implementation requires a set of pages and a set of directed links as input and works as +* follows. +* +* In each iteration, the rank of every page is evenly distributed to all pages it points to. Each +* page collects the partial ranks of all pages that point to it, sums them up, and applies a +* dampening factor to the sum. The result is the new rank of the page. A new iteration is started +* with the new ranks of all pages. This implementation terminates after a fixed number of +* iterations. This is the Wikipedia entry for the +* [[http://en.wikipedia.org/wiki/Page_rank Page Rank algorithm]] +* +* Input files are plain text files and must be formatted as follows: +* +* - Pages represented as an (long) ID separated by new-line characters. +* For example `"1\n2\n12\n42\n63"` gives five pages with IDs 1, 2, 12, 42, and 63. +* - Links are represented as pairs of page IDs which are separated by space characters. Links +* are separated by new-line characters. +* For example `"1 2\n2 12\n1 12\n42 63"` gives four (directed) links (1)->(2), (2)->(12), +* (1)->(12), and (42)->(63). For this simple implementation it is required that each page has +* at least one incoming and one outgoing link (a page can point to itself). +* +* Usage: +* {{{ +* PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations> +* }}} +* +* If no parameters are provided, the program is run with default data from +* [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations. +* +* This example shows how to use: +* +* - Bulk Iterations +* - Table API expressions +*/ +object PageRankTable { + + private final val DAMPENING_FACTOR: Double = 0.85 + private final val EPSILON: Double = 0.0001 + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + // set up execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + + // read input data + val pagesWithRanks = getPagesDataSet(env).map { p => (p, 1.0 / numPages) } + .as('pageId, 'rank) + + val links = getLinksDataSet(env) + + // build adjacency list from link input + val adjacencyLists = links + .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] { + + override def reduce( + values: _root_.java.lang.Iterable[Link], + out: Collector[AdjacencyList]): Unit = { + var outputId = -1L + val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId } + out.collect(new AdjacencyList(outputId, outputList.toArray)) + } + + }).as('sourceId, 'targetIds) + + // start iteration + val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) { + currentRanks => + val newRanks = currentRanks.toTable + // distribute ranks to target pages + .join(adjacencyLists).where('pageId === 'sourceId) + .select('rank, 'targetIds).toDataSet[RankOutput] + .flatMap { + (in, out: Collector[(Long, Double)]) => + val targets = in.targetIds + val len = targets.length + targets foreach { t => out.collect((t, in.rank / len )) } + } + .as('pageId, 'rank) + // collect ranks and sum them up + .groupBy('pageId).select('pageId, 'rank.sum as 'rank) + // apply dampening factor + .select( + 'pageId, + ('rank * DAMPENING_FACTOR) + (Literal(1) - DAMPENING_FACTOR) / numPages as 'rank) + + + val termination = currentRanks.toTable + .as('curId, 'curRank).join(newRanks.as('newId, 'newRank)) + .where('curId === 'newId && ('curRank - 'newRank).abs > EPSILON) + + (newRanks, termination) + } + + val result = finalRanks + + // emit result + if (fileOutput) { + result.writeAsCsv(outputPath, "\n", " ") + // execute program + env.execute("Expression PageRank Example") + } else { + // execute program and print result + result.print() + } + } + + // ************************************************************************* + // USER TYPES + // ************************************************************************* + + case class Link(sourceId: Long, targetId: Long) + + case class Page(pageId: Long, rank: Double) + + case class AdjacencyList(sourceId: Long, targetIds: Array[Long]) + + case class RankOutput(rank: Double, targetIds: Array[Long]) + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private def parseParameters(args: Array[String]): Boolean = { + if (args.length > 0) { + fileOutput = true + if (args.length == 5) { + pagesInputPath = args(0) + linksInputPath = args(1) + outputPath = args(2) + numPages = args(3).toLong + maxIterations = args(4).toInt + } else { + System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num " + + "pages> <num iterations>") + false + } + } else { + System.out.println("Executing PageRank Basic example with default parameters and built-in " + + "default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println(" Usage: PageRankBasic <pages path> <links path> <output path> <num " + + "pages> <num iterations>") + + numPages = PageRankData.getNumberOfPages + } + true + } + + private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = { + if (fileOutput) { + env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", lineDelimiter = "\n") + .map(x => x._1) + } else { + env.generateSequence(1, 15) + } + } + + private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = { + if (fileOutput) { + env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ", + includedFields = Array(0, 1)) + } else { + val edges = PageRankData.EDGES.map { case Array(v1, v2) => Link(v1.asInstanceOf[Long], + v2.asInstanceOf[Long])} + env.fromCollection(edges) + } + } + + private var fileOutput: Boolean = false + private var pagesInputPath: String = null + private var linksInputPath: String = null + private var outputPath: String = null + private var numPages: Double = 0 + private var maxIterations: Int = 10 + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala new file mode 100644 index 0000000..63dddc9 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.scala + +import org.apache.flink.streaming.api.scala._ + +import org.apache.flink.api.scala.table._ + +import scala.Stream._ +import scala.math._ +import scala.language.postfixOps +import scala.util.Random + +/** + * Simple example for demonstrating the use of the Table API with Flink Streaming. + */ +object StreamingTableFilter { + + case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long) extends Serializable + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val cars = genCarStream().toTable + .filter('carId === 0) + .select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 'time) + .toDataStream[CarEvent] + + cars.print() + + StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing") + + } + + def genCarStream(): DataStream[CarEvent] = { + + def nextSpeed(carEvent : CarEvent) : CarEvent = + { + val next = + if (Random.nextBoolean()) min(100, carEvent.speed + 5) else max(0, carEvent.speed - 5) + CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis) + } + def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] = + { + Thread.sleep(1000) + speeds.append(carStream(speeds.map(nextSpeed))) + } + carStream(range(0, numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis()))) + } + + def parseParameters(args: Array[String]): Boolean = { + if (args.length > 0) { + if (args.length == 3) { + numOfCars = args(0).toInt + evictionSec = args(1).toInt + triggerMeters = args(2).toDouble + true + } + else { + System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> <triggerMeters>") + false + } + }else{ + true + } + } + + var numOfCars = 2 + var evictionSec = 10 + var triggerMeters = 50d + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala new file mode 100644 index 0000000..f527a3c --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.scala + +import org.apache.flink.api.table.expressions.Literal +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ + +/** + * This program implements a modified version of the TPC-H query 3. The + * example demonstrates how to assign names to fields by extending the Tuple class. + * The original query can be found at + * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf) + * (page 29). + * + * This program implements the following SQL equivalent: + * + * {{{ + * SELECT + * l_orderkey, + * SUM(l_extendedprice*(1-l_discount)) AS revenue, + * o_orderdate, + * o_shippriority + * FROM customer, + * orders, + * lineitem + * WHERE + * c_mktsegment = '[SEGMENT]' + * AND c_custkey = o_custkey + * AND l_orderkey = o_orderkey + * AND o_orderdate < date '[DATE]' + * AND l_shipdate > date '[DATE]' + * GROUP BY + * l_orderkey, + * o_orderdate, + * o_shippriority; + * }}} + * + * Compared to the original TPC-H query this version does not sort the result by revenue + * and orderdate. + * + * Input files are plain text CSV files using the pipe character ('|') as field separator + * as generated by the TPC-H data generator which is available at + * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/). + * + * Usage: + * {{{ + * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path> + * }}} + * + * This example shows how to use: + * - Table API expressions + * + */ +object TPCHQuery3Table { + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + // set filter date + val dateFormat = new _root_.java.text.SimpleDateFormat("yyyy-MM-dd") + val date = dateFormat.parse("1995-03-12") + + // get execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + + val lineitems = getLineitemDataSet(env) + .filter( l => dateFormat.parse(l.shipDate).after(date) ) + .as('id, 'extdPrice, 'discount, 'shipDate) + + val customers = getCustomerDataSet(env) + .as('id, 'mktSegment) + .filter( 'mktSegment === "AUTOMOBILE" ) + + val orders = getOrdersDataSet(env) + .filter( o => dateFormat.parse(o.orderDate).before(date) ) + .as('orderId, 'custId, 'orderDate, 'shipPrio) + + val items = + orders.join(customers) + .where('custId === 'id) + .select('orderId, 'orderDate, 'shipPrio) + .join(lineitems) + .where('orderId === 'id) + .select( + 'orderId, + 'extdPrice * (Literal(1.0f) - 'discount) as 'revenue, + 'orderDate, + 'shipPrio) + + val result = items + .groupBy('orderId, 'orderDate, 'shipPrio) + .select('orderId, 'revenue.sum, 'orderDate, 'shipPrio) + + // emit result + result.writeAsCsv(outputPath, "\n", "|") + + // execute program + env.execute("Scala TPCH Query 3 (Expression) Example") + } + + // ************************************************************************* + // USER DATA TYPES + // ************************************************************************* + + case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: String) + case class Customer(id: Long, mktSegment: String) + case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long) + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private var lineitemPath: String = null + private var customerPath: String = null + private var ordersPath: String = null + private var outputPath: String = null + + private def parseParameters(args: Array[String]): Boolean = { + if (args.length == 4) { + lineitemPath = args(0) + customerPath = args(1) + ordersPath = args(2) + outputPath = args(3) + true + } else { + System.err.println("This program expects data from the TPC-H benchmark as input data.\n" + + " Due to legal restrictions, we can not ship generated data.\n" + + " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + + " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" + + "<orders-csv path> <result path>"); + false + } + } + + private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = { + env.readCsvFile[Lineitem]( + lineitemPath, + fieldDelimiter = "|", + includedFields = Array(0, 5, 6, 10) ) + } + + private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = { + env.readCsvFile[Customer]( + customerPath, + fieldDelimiter = "|", + includedFields = Array(0, 6) ) + } + + private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = { + env.readCsvFile[Order]( + ordersPath, + fieldDelimiter = "|", + includedFields = Array(0, 1, 4, 7) ) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala new file mode 100644 index 0000000..cac9590 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.examples.scala + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ + +/** + * Simple example for demonstrating the use of the Table API for a Word Count. + */ +object WordCountTable { + + case class WC(word: String, count: Int) + + def main(args: Array[String]): Unit = { + + // set up execution environment + val env = ExecutionEnvironment.getExecutionEnvironment + + val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1)) + val expr = input.toTable + val result = expr + .groupBy('word) + .select('word, 'count.sum as 'count) + .toDataSet[WC] + + result.print() + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/Node.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/Node.java b/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/Node.java new file mode 100644 index 0000000..9152260 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/Node.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.explain; + +import java.util.List; + +public class Node { + private int id; + private String type; + private String pact; + private String contents; + private int parallelism; + private String driver_strategy; + private List<Predecessors> predecessors; + private List<Global_properties> global_properties; + private List<LocalProperty> local_properties; + private List<Estimates> estimates; + private List<Costs> costs; + private List<Compiler_hints> compiler_hints; + + public int getId() { + return id; + } + public String getType() { + return type; + } + public String getPact() { + return pact; + } + public String getContents() { + return contents; + } + public int getParallelism() { + return parallelism; + } + public String getDriver_strategy() { + return driver_strategy; + } + public List<Predecessors> getPredecessors() { + return predecessors; + } + public List<Global_properties> getGlobal_properties() { + return global_properties; + } + public List<LocalProperty> getLocal_properties() { + return local_properties; + } + public List<Estimates> getEstimates() { + return estimates; + } + public List<Costs> getCosts() { + return costs; + } + public List<Compiler_hints> getCompiler_hints() { + return compiler_hints; + } +} + +class Predecessors { + private String ship_strategy; + private String exchange_mode; + + public String getShip_strategy() { + return ship_strategy; + } + public String getExchange_mode() { + return exchange_mode; + } +} + +class Global_properties { + private String name; + private String value; + + public String getValue() { + return value; + } + public String getName() { + return name; + } +} + +class LocalProperty { + private String name; + private String value; + + public String getValue() { + return value; + } + public String getName() { + return name; + } +} + +class Estimates { + private String name; + private String value; + + public String getValue() { + return value; + } + public String getName() { + return name; + } +} + +class Costs { + private String name; + private String value; + + public String getValue() { + return value; + } + public String getName() { + return name; + } +} + +class Compiler_hints { + private String name; + private String value; + + public String getValue() { + return value; + } + public String getName() { + return name; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/PlanJsonParser.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/PlanJsonParser.java b/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/PlanJsonParser.java new file mode 100644 index 0000000..31a7cd68 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/explain/PlanJsonParser.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.explain; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.DeserializationFeature; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.LinkedHashMap; +import java.util.List; + +public class PlanJsonParser { + + public static String getSqlExecutionPlan(String t, Boolean extended) throws Exception { + ObjectMapper objectMapper = new ObjectMapper(); + + //not every node is same, ignore the unknown field + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + PlanTree tree = objectMapper.readValue(t, PlanTree.class); + LinkedHashMap<String, Integer> map = new LinkedHashMap<>(); + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + int tabCount = 0; + + for (int index = 0; index < tree.getNodes().size(); index++) { + Node tempNode = tree.getNodes().get(index); + + //input with operation such as join or union is coordinate, keep the same indent + if ((tempNode.getPact().equals("Data Source")) && (map.containsKey(tempNode.getPact()))) { + tabCount = map.get(tempNode.getPact()); + } + else { + map.put(tempNode.getPact(), tabCount); + } + + printTab(tabCount, pw); + pw.print("Stage " + tempNode.getId() + " : " + tempNode.getPact() + "\n"); + + printTab(tabCount + 1, pw); + String content = tempNode.getContents(); + + //drop the hashcode of object instance + int dele = tempNode.getContents().indexOf("@"); + if (dele > -1) content = tempNode.getContents().substring(0, dele); + + //replace with certain content if node is dataSource to pass + //unit tests, because java and scala use different api to + //get input element + if (tempNode.getPact().equals("Data Source")) + content = "collect elements with CollectionInputFormat"; + pw.print("content : " + content + "\n"); + + List<Predecessors> predecessors = tempNode.getPredecessors(); + if (predecessors != null) { + printTab(tabCount + 1, pw); + pw.print("ship_strategy : " + predecessors.get(0).getShip_strategy() + "\n"); + + printTab(tabCount + 1, pw); + pw.print("exchange_mode : " + predecessors.get(0).getExchange_mode() + "\n"); + } + + if (tempNode.getDriver_strategy() != null) { + printTab(tabCount + 1, pw); + pw.print("driver_strategy : " + tempNode.getDriver_strategy() + "\n"); + } + + printTab(tabCount + 1, pw); + pw.print(tempNode.getGlobal_properties().get(0).getName() + " : " + + tempNode.getGlobal_properties().get(0).getValue() + "\n"); + + if (extended) { + List<Global_properties> globalProperties = tempNode.getGlobal_properties(); + for (int i = 1; i < globalProperties.size(); i++) { + printTab(tabCount + 1, pw); + pw.print(globalProperties.get(i).getName() + " : " + + globalProperties.get(i).getValue() + "\n"); + } + + List<LocalProperty> localProperties = tempNode.getLocal_properties(); + for (int i = 0; i < localProperties.size(); i++) { + printTab(tabCount + 1, pw); + pw.print(localProperties.get(i).getName() + " : " + + localProperties.get(i).getValue() + "\n"); + } + + List<Estimates> estimates = tempNode.getEstimates(); + for (int i = 0; i < estimates.size(); i++) { + printTab(tabCount + 1, pw); + pw.print(estimates.get(i).getName() + " : " + + estimates.get(i).getValue() + "\n"); + } + + List<Costs> costs = tempNode.getCosts(); + for (int i = 0; i < costs.size(); i++) { + printTab(tabCount + 1, pw); + pw.print(costs.get(i).getName() + " : " + + costs.get(i).getValue() + "\n"); + } + + List<Compiler_hints> compilerHintses = tempNode.getCompiler_hints(); + for (int i = 0; i < compilerHintses.size(); i++) { + printTab(tabCount + 1, pw); + pw.print(compilerHintses.get(i).getName() + " : " + + compilerHintses.get(i).getValue() + "\n"); + } + } + tabCount++; + pw.print("\n"); + } + pw.close(); + return sw.toString(); + } + + private static void printTab(int tabCount, PrintWriter pw) { + for (int i = 0; i < tabCount; i++) + pw.print("\t"); + } +} + +class PlanTree { + private List<Node> nodes; + + public List<Node> getNodes() { + return nodes; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java new file mode 100644 index 0000000..bdebfb1 --- /dev/null +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.table.test; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.table.ExpressionException; +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +@RunWith(Parameterized.class) +public class AggregationsITCase extends MultipleProgramsTestBase { + + + public AggregationsITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testAggregationTypes() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + Table table = tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env)); + + Table result = table.select("f0.sum, f0.min, f0.max, f0.count, f0.avg"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "231,1,21,21,11"; + compareResultAsText(results, expected); + } + + @Test(expected = ExpressionException.class) + public void testAggregationOnNonExistingField() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + Table table = + tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env)); + + Table result = + table.select("foo.avg"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); + } + + @Test + public void testWorkingAggregationDataTypes() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input = + env.fromElements( + new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"), + new Tuple7<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Ciao")); + + Table table = + tableEnv.fromDataSet(input); + + Table result = + table.select("f0.avg, f1.avg, f2.avg, f3.avg, f4.avg, f5.avg, f6.count"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "1,1,1,1,1.5,1.5,2"; + compareResultAsText(results, expected); + } + + @Test + public void testAggregationWithArithmetic() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple2<Float, String>> input = + env.fromElements( + new Tuple2<>(1f, "Hello"), + new Tuple2<>(2f, "Ciao")); + + Table table = + tableEnv.fromDataSet(input); + + Table result = + table.select("(f0 + 2).avg + 2, f1.count + \" THE COUNT\""); + + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "5.5,2 THE COUNT"; + compareResultAsText(results, expected); + } + + @Test + public void testAggregationWithTwoCount() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple2<Float, String>> input = + env.fromElements( + new Tuple2<>(1f, "Hello"), + new Tuple2<>(2f, "Ciao")); + + Table table = + tableEnv.fromDataSet(input); + + Table result = + table.select("f0.count, f1.count"); + + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "2,2"; + compareResultAsText(results, expected); + } + + @Test(expected = ExpressionException.class) + public void testNonWorkingDataTypes() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello")); + + Table table = + tableEnv.fromDataSet(input); + + Table result = + table.select("f1.sum"); + + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); + } + + @Test(expected = ExpressionException.class) + public void testNoNestedAggregation() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello")); + + Table table = + tableEnv.fromDataSet(input); + + Table result = + table.select("f0.sum.sum"); + + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java new file mode 100644 index 0000000..f6ab54e --- /dev/null +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import org.apache.flink.api.table.ExpressionException; +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +@RunWith(Parameterized.class) +public class AsITCase extends MultipleProgramsTestBase { + + + public AsITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testAs() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + Table table = + tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c"); + + DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); + List<Row> results = ds.collect(); + String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; + compareResultAsText(results, expected); + } + + @Test(expected = ExpressionException.class) + public void testAsWithToFewFields() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + Table table = + tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b"); + + DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); + } + + @Test(expected = ExpressionException.class) + public void testAsWithToManyFields() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + Table table = + tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d"); + + DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); + } + + @Test(expected = ExpressionException.class) + public void testAsWithAmbiguousFields() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + Table table = + tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b"); + + DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); + } + + @Test(expected = ExpressionException.class) + public void testAsWithNonFieldReference1() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + Table table = + tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c"); + + DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); + } + + @Test(expected = ExpressionException.class) + public void testAsWithNonFieldReference2() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + Table table = + tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b," + + " c"); + + DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java new file mode 100644 index 0000000..7e9e3dc --- /dev/null +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +@RunWith(Parameterized.class) +public class CastingITCase extends MultipleProgramsTestBase { + + + public CastingITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testAutoCastToString() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input = + env.fromElements(new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello")); + + Table table = + tableEnv.fromDataSet(input); + + Table result = table.select( + "f0 + 'b', f1 + 's', f2 + 'i', f3 + 'L', f4 + 'f', f5 + \"d\""); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "1b,1s,1i,1L,1.0f,1.0d"; + compareResultAsText(results, expected); + } + + @Test + public void testNumericAutocastInArithmetic() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input = + env.fromElements(new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello")); + + Table table = + tableEnv.fromDataSet(input); + + Table result = table.select("f0 + 1, f1 +" + + " 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "2,2,2,2.0,2.0,2.0"; + compareResultAsText(results, expected); + } + + @Test + public void testNumericAutocastInComparison() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input = + env.fromElements( + new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"), + new Tuple7<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Hello")); + + Table table = + tableEnv.fromDataSet(input, "a,b,c,d,e,f,g"); + + Table result = table + .filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "2,2,2,2,2.0,2.0,Hello"; + compareResultAsText(results, expected); + } + + @Test + public void testCastFromString() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple3<String, String, String>> input = + env.fromElements(new Tuple3<>("1", "true", "2.0")); + + Table table = + tableEnv.fromDataSet(input); + + Table result = table.select( + "f0.cast(BYTE), f0.cast(SHORT), f0.cast(INT), f0.cast(LONG), f2.cast(DOUBLE), f2.cast(FLOAT), f1.cast(BOOL)"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "1,1,1,1,2.0,2.0,true\n"; + compareResultAsText(results, expected); + } + + @Test + public void testCastDateFromString() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple4<String, String, String, String>> input = + env.fromElements(new Tuple4<>("2011-05-03", "15:51:36", "2011-05-03 15:51:36.000", "1446473775")); + + Table table = + tableEnv.fromDataSet(input); + + Table result = table + .select("f0.cast(DATE) AS f0, f1.cast(DATE) AS f1, f2.cast(DATE) AS f2, f3.cast(DATE) AS f3") + .select("f0.cast(STRING), f1.cast(STRING), f2.cast(STRING), f3.cast(STRING)"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "2011-05-03 00:00:00.000,1970-01-01 15:51:36.000,2011-05-03 15:51:36.000," + + "1970-01-17 17:47:53.775\n"; + compareResultAsText(results, expected); + } + + @Test + public void testCastDateToStringAndLong() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple2<String, String>> input = + env.fromElements(new Tuple2<>("2011-05-03 15:51:36.000", "1304437896000")); + + Table table = + tableEnv.fromDataSet(input); + + Table result = table + .select("f0.cast(DATE) AS f0, f1.cast(DATE) AS f1") + .select("f0.cast(STRING), f0.cast(LONG), f1.cast(STRING), f1.cast(LONG)"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "2011-05-03 15:51:36.000,1304437896000,2011-05-03 15:51:36.000,1304437896000\n"; + compareResultAsText(results, expected); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java new file mode 100644 index 0000000..c9bba62 --- /dev/null +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import org.apache.flink.api.table.ExpressionException; +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +@RunWith(Parameterized.class) +public class ExpressionsITCase extends MultipleProgramsTestBase { + + + public ExpressionsITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testArithmetic() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple2<Integer, Integer>> input = + env.fromElements(new Tuple2<>(5, 10)); + + Table table = + tableEnv.fromDataSet(input, "a, b"); + + Table result = table.select( + "a - 5, a + 5, a / 2, a * 2, a % 2, -a"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "0,10,2,10,1,-5"; + compareResultAsText(results, expected); + } + + @Test + public void testLogic() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple2<Integer, Boolean>> input = + env.fromElements(new Tuple2<>(5, true)); + + Table table = + tableEnv.fromDataSet(input, "a, b"); + + Table result = table.select( + "b && true, b && false, b || false, !b"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "true,false,true,false"; + compareResultAsText(results, expected); + } + + @Test + public void testComparisons() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple3<Integer, Integer, Integer>> input = + env.fromElements(new Tuple3<>(5, 5, 4)); + + Table table = + tableEnv.fromDataSet(input, "a, b, c"); + + Table result = table.select( + "a > c, a >= b, a < c, a.isNull, a.isNotNull"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "true,true,false,false,true"; + compareResultAsText(results, expected); + } + + @Test + public void testBitwiseOperation() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple2<Byte, Byte>> input = + env.fromElements(new Tuple2<>((byte) 3, (byte) 5)); + + Table table = + tableEnv.fromDataSet(input, "a, b"); + + Table result = table.select( + "a & b, a | b, a ^ b, ~a"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "1,7,6,-4"; + compareResultAsText(results, expected); + } + + @Test + public void testBitwiseWithAutocast() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple2<Integer, Byte>> input = + env.fromElements(new Tuple2<>(3, (byte) 5)); + + Table table = + tableEnv.fromDataSet(input, "a, b"); + + Table result = table.select( + "a & b, a | b, a ^ b, ~a"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "1,7,6,-4"; + compareResultAsText(results, expected); + } + + @Test(expected = ExpressionException.class) + public void testBitwiseWithNonWorkingAutocast() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple2<Float, Byte>> input = + env.fromElements(new Tuple2<>(3.0f, (byte) 5)); + + Table table = + tableEnv.fromDataSet(input, "a, b"); + + Table result = + table.select("a & b, a | b, a ^ b, ~a"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java new file mode 100644 index 0000000..44e0def --- /dev/null +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +@RunWith(Parameterized.class) +public class FilterITCase extends MultipleProgramsTestBase { + + + public FilterITCase(TestExecutionMode mode){ + super(mode); + } + + @Test + public void testAllRejectingFilter() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + Table table = + tableEnv.fromDataSet(input, "a, b, c"); + + Table result = table + .filter("false"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "\n"; + compareResultAsText(results, expected); + } + + @Test + public void testAllPassingFilter() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + Table table = + tableEnv.fromDataSet(input, "a, b, c"); + + Table result = table + .filter("true"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"; + compareResultAsText(results, expected); + } + + @Test + public void testFilterOnIntegerTupleField() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + Table table = + tableEnv.fromDataSet(input, "a, b, c"); + + Table result = table + .filter(" a % 2 = 0 "); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," + + "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + + "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"; + compareResultAsText(results, expected); + } + + @Test + public void testNotEquals() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + Table table = + tableEnv.fromDataSet(input, "a, b, c"); + + Table result = table + .filter("!( a % 2 <> 0 ) "); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," + + "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + + "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"; + compareResultAsText(results, expected); + } + + @Test + public void testIntegerBiggerThan128() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> input = env.fromElements(new Tuple3<>(300, 1L, "Hello")); + + Table table = tableEnv.fromDataSet(input, "a, b, c"); + + Table result = table.filter("a = 300 "); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "300,1,Hello\n"; + compareResultAsText(results, expected); + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java new file mode 100644 index 0000000..f5c9185 --- /dev/null +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.test; + +import org.apache.flink.api.table.ExpressionException; +import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.table.TableEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +@RunWith(Parameterized.class) +public class GroupedAggregationsITCase extends MultipleProgramsTestBase { + + + public GroupedAggregationsITCase(TestExecutionMode mode){ + super(mode); + } + + @Test(expected = ExpressionException.class) + public void testGroupingOnNonExistentField() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + Table table = + tableEnv.fromDataSet(input, "a, b, c"); + + Table result = table + .groupBy("foo").select("a.avg"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = ""; + compareResultAsText(results, expected); + } + + @Test + public void testGroupedAggregate() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + Table table = + tableEnv.fromDataSet(input, "a, b, c"); + + Table result = table + .groupBy("b").select("b, a.sum"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"; + compareResultAsText(results, expected); + } + + @Test + public void testGroupingKeyForwardIfNotUsed() throws Exception { + + // the grouping key needs to be forwarded to the intermediate DataSet, even + // if we don't want the key in the output + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + Table table = + tableEnv.fromDataSet(input, "a, b, c"); + + Table result = table + .groupBy("b").select("a.sum"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"; + compareResultAsText(results, expected); + } + + @Test + public void testGroupNoAggregation() throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); + + Table table = + tableEnv.fromDataSet(input, "a, b, c"); + + Table result = table + .groupBy("b").select("a.sum as d, b").groupBy("b, d").select("b"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + + String expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"; + List<Row> results = ds.collect(); + compareResultAsText(results, expected); + } +} +