Repository: flink Updated Branches: refs/heads/master a997dd615 -> 31a2de86d
[FLINK-2961] [table] Add support for basic type Date in Table API Fix nullCheck enabled Fix test TableConfig introduced Improvements and bug fixing This closes #1322. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31a2de86 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31a2de86 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31a2de86 Branch: refs/heads/master Commit: 31a2de86d9034def9c58ef7f3d3dcae3d4dafd6f Parents: a997dd6 Author: twalthr <twal...@apache.org> Authored: Tue Nov 3 15:18:19 2015 +0100 Committer: twalthr <twal...@apache.org> Committed: Sun Nov 29 16:48:27 2015 +0100 ---------------------------------------------------------------------- docs/libs/table.md | 6 +- .../apache/flink/api/table/TableConfig.scala | 66 +++++ .../table/codegen/ExpressionCodeGenerator.scala | 246 ++++++++++++++++--- .../api/table/codegen/GenerateFilter.scala | 27 +- .../flink/api/table/codegen/GenerateJoin.scala | 37 ++- .../table/codegen/GenerateResultAssembler.scala | 13 +- .../api/table/codegen/GenerateSelect.scala | 20 +- .../flink/api/table/expressions/literals.scala | 2 + .../api/table/parser/ExpressionParser.scala | 3 +- .../runtime/ExpressionFilterFunction.scala | 9 +- .../table/runtime/ExpressionJoinFunction.scala | 9 +- .../runtime/ExpressionSelectFunction.scala | 9 +- .../api/java/table/test/CastingITCase.java | 23 ++ .../api/scala/table/test/CastingITCase.scala | 22 +- .../scala/table/test/ExpressionsITCase.scala | 18 ++ 15 files changed, 440 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/docs/libs/table.md ---------------------------------------------------------------------- diff --git a/docs/libs/table.md b/docs/libs/table.md index be0b45e..d1d42cc 100644 --- a/docs/libs/table.md +++ b/docs/libs/table.md @@ -351,11 +351,11 @@ unary = [ "!" | "-" | "~" ] , suffix ; suffix = atom | aggregation | cast | as | substring ; -aggregation = atom , [ ".sum" | ".min" | ".max" | ".count" | "avg" ] ; +aggregation = atom , [ ".sum" | ".min" | ".max" | ".count" | ".avg" ] ; cast = atom , ".cast(" , data type , ")" ; -data type = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOL" | "BOOLEAN" | "STRING" ; +data type = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOL" | "BOOLEAN" | "STRING" | "DATE" ; as = atom , ".as(" , field reference , ")" ; @@ -372,3 +372,5 @@ atom = ( "(" , single expression , ")" ) | literal | field reference ; Here, `literal` is a valid Java literal and `field reference` specifies a column in the data. The column names follow Java identifier syntax. +Only the types `LONG` and `STRING` can be casted to `DATE` and vice versa. A `LONG` casted to `DATE` must be a milliseconds timestamp. A `STRING` casted to `DATE` must have the format "`yyyy-MM-dd HH:mm:ss.SSS`", "`yyyy-MM-dd`", "`HH:mm:ss`", or a milliseconds timestamp. By default, all timestamps refer to the UTC timezone beginning from January 1, 1970, 00:00:00 in milliseconds. + http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala new file mode 100644 index 0000000..ffa2bec --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/TableConfig.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +import java.util.TimeZone + +/** + * A config to define the runtime behavior of the Table API. + */ +class TableConfig extends Serializable { + + /** + * Defines the timezone for date/time/timestamp conversions. + */ + private var timeZone: TimeZone = TimeZone.getTimeZone("UTC") + + /** + * Defines if all fields need to be checked for NULL first. + */ + private var nullCheck: Boolean = false + + /** + * Sets the timezone for date/time/timestamp conversions. + */ + def setTimeZone(timeZone: TimeZone) = { + require(timeZone != null, "timeZone must not be null.") + this.timeZone = timeZone + } + + /** + * Returns the timezone for date/time/timestamp conversions. + */ + def getTimeZone = timeZone + + /** + * Returns the NULL check. If enabled, all fields need to be checked for NULL first. + */ + def getNullCheck = nullCheck + + /** + * Sets the NULL check. If enabled, all fields need to be checked for NULL first. + */ + def setNullCheck(nullCheck: Boolean) = { + this.nullCheck = nullCheck + } + +} + +object TableConfig { + val DEFAULT = new TableConfig() +} http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala index c9220e9..42dec0f 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala @@ -17,11 +17,9 @@ */ package org.apache.flink.api.table.codegen +import java.util.Date import java.util.concurrent.atomic.AtomicInteger -import org.codehaus.janino.SimpleCompiler -import org.slf4j.LoggerFactory - import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType @@ -29,22 +27,26 @@ import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.api.table.expressions._ import org.apache.flink.api.table.typeinfo.{RenamingProxyTypeInfo, RowTypeInfo} -import org.apache.flink.api.table.{ExpressionException, expressions} +import org.apache.flink.api.table.{ExpressionException, TableConfig, expressions} +import org.codehaus.janino.SimpleCompiler +import org.slf4j.LoggerFactory + +import scala.collection.mutable /** Base class for all code generation classes. This provides the functionality for generating * code from an [[Expression]] tree. Derived classes must embed this in a lambda function * to form an executable code block. * * @param inputs List of input variable names with corresponding [[TypeInformation]]. - * @param nullCheck Whether the generated code should include checks for NULL values. * @param cl The ClassLoader that is used to create the Scala reflection ToolBox + * @param config General configuration specifying runtime behaviour. * @tparam R The type of the generated code block. In most cases a lambda function such * as "(IN1, IN2) => OUT". */ abstract class ExpressionCodeGenerator[R]( inputs: Seq[(String, CompositeType[_])], - val nullCheck: Boolean = false, - cl: ClassLoader) { + cl: ClassLoader, + config: TableConfig) { protected val log = LoggerFactory.getLogger(classOf[ExpressionCodeGenerator[_]]) import scala.reflect.runtime.universe._ @@ -57,6 +59,19 @@ abstract class ExpressionCodeGenerator[R]( val compiler = new SimpleCompiler() compiler.setParentClassLoader(cl) + protected val reusableMemberStatements = mutable.Set[String]() + + protected val reusableInitStatements = mutable.Set[String]() + + protected def reuseMemberCode(): String = { + reusableMemberStatements.mkString("", "\n", "\n") + } + + protected def reuseInitCode(): String = { + reusableInitStatements.mkString("", "\n", "\n") + } + + protected def nullCheck: Boolean = config.getNullCheck // This is to be implemented by subclasses, we have it like this // so that we only call it from here with the Scala Reflection Lock. @@ -94,9 +109,9 @@ abstract class ExpressionCodeGenerator[R]( |boolean $nullTerm = ${leftCode.nullTerm} || ${rightCode.nullTerm}; |$resultTpe $resultTerm; |if ($nullTerm) { - | $resultTerm = ${defaultPrimitive(resultType)} + | $resultTerm = ${defaultPrimitive(resultType)}; |} else { - | $resultTerm = ${expr(leftCode.resultTerm, rightCode.resultTerm)} + | $resultTerm = ${expr(leftCode.resultTerm, rightCode.resultTerm)}; |} """.stripMargin } else { @@ -157,7 +172,7 @@ abstract class ExpressionCodeGenerator[R]( case expressions.Literal(doubleValue: Double, DOUBLE_TYPE_INFO) => if (nullCheck) { s""" - |val $nullTerm = false + |boolean $nullTerm = false; |$resultTpe $resultTerm = $doubleValue; """.stripMargin } else { @@ -169,7 +184,7 @@ abstract class ExpressionCodeGenerator[R]( case expressions.Literal(floatValue: Float, FLOAT_TYPE_INFO) => if (nullCheck) { s""" - |val $nullTerm = false + |boolean $nullTerm = false; |$resultTpe $resultTerm = ${floatValue}f; """.stripMargin } else { @@ -181,7 +196,7 @@ abstract class ExpressionCodeGenerator[R]( case expressions.Literal(strValue: String, STRING_TYPE_INFO) => if (nullCheck) { s""" - |val $nullTerm = false + |boolean $nullTerm = false; |$resultTpe $resultTerm = "$strValue"; """.stripMargin } else { @@ -193,7 +208,7 @@ abstract class ExpressionCodeGenerator[R]( case expressions.Literal(boolValue: Boolean, BOOLEAN_TYPE_INFO) => if (nullCheck) { s""" - |val $nullTerm = false + |boolean $nullTerm = false; |$resultTpe $resultTerm = $boolValue; """.stripMargin } else { @@ -202,6 +217,23 @@ abstract class ExpressionCodeGenerator[R]( """.stripMargin } + case expressions.Literal(dateValue: Date, DATE_TYPE_INFO) => + val dateName = s"""date_${dateValue.getTime}""" + val dateStmt = s"""static java.util.Date $dateName + |= new java.util.Date(${dateValue.getTime});""".stripMargin + reusableMemberStatements.add(dateStmt) + + if (nullCheck) { + s""" + |boolean $nullTerm = false; + |$resultTpe $resultTerm = $dateName; + """.stripMargin + } else { + s""" + |$resultTpe $resultTerm = $dateName; + """.stripMargin + } + case Substring(str, beginIndex, endIndex) => val strCode = generateExpression(str) val beginIndexCode = generateExpression(beginIndex) @@ -243,6 +275,29 @@ abstract class ExpressionCodeGenerator[R]( """ } + case expressions.Cast(child: Expression, STRING_TYPE_INFO) + if child.typeInfo == BasicTypeInfo.DATE_TYPE_INFO => + val childGen = generateExpression(child) + + addTimestampFormatter() + + val castCode = if (nullCheck) { + s""" + |boolean $nullTerm = ${childGen.nullTerm}; + |$resultTpe $resultTerm; + |if ($nullTerm) { + | $resultTerm = null; + |} else { + | $resultTerm = timestampFormatter.format(${childGen.resultTerm}); + |} + """.stripMargin + } else { + s""" + |$resultTpe $resultTerm = timestampFormatter.format(${childGen.resultTerm}); + """.stripMargin + } + childGen.code + castCode + case expressions.Cast(child: Expression, STRING_TYPE_INFO) => val childGen = generateExpression(child) val castCode = if (nullCheck) { @@ -262,6 +317,104 @@ abstract class ExpressionCodeGenerator[R]( } childGen.code + castCode + case expressions.Cast(child: Expression, DATE_TYPE_INFO) + if child.typeInfo == BasicTypeInfo.LONG_TYPE_INFO => + val childGen = generateExpression(child) + val castCode = if (nullCheck) { + s""" + |boolean $nullTerm = ${childGen.nullTerm}; + |$resultTpe $resultTerm; + |if ($nullTerm) { + | $resultTerm = null; + |} else { + | $resultTerm = new java.util.Date(${childGen.resultTerm}); + |} + """.stripMargin + } else { + s""" + |$resultTpe $resultTerm = new java.util.Date(${childGen.resultTerm}); + """.stripMargin + } + childGen.code + castCode + + case expressions.Cast(child: Expression, DATE_TYPE_INFO) + if child.typeInfo == BasicTypeInfo.STRING_TYPE_INFO => + val childGen = generateExpression(child) + + addDateFormatter() + addTimeFormatter() + addTimestampFormatter() + + // tries to parse + // "2011-05-03 15:51:36.234" + // then "2011-05-03" + // then "15:51:36" + // then "1446473775" + val parsedName = freshName("parsed") + val parsingCode = + s""" + |java.util.Date $parsedName = null; + |try { + | $parsedName = timestampFormatter.parse(${childGen.resultTerm}); + |} catch (java.text.ParseException e1) { + | try { + | $parsedName = dateFormatter.parse(${childGen.resultTerm}); + | } catch (java.text.ParseException e2) { + | try { + | $parsedName = timeFormatter.parse(${childGen.resultTerm}); + | } catch (java.text.ParseException e3) { + | $parsedName = new java.util.Date(Long.valueOf(${childGen.resultTerm})); + | } + | } + |} + """.stripMargin + + val castCode = if (nullCheck) { + s""" + |boolean $nullTerm = ${childGen.nullTerm}; + |$resultTpe $resultTerm; + |if ($nullTerm) { + | $resultTerm = null; + |} else { + | $parsingCode + | $resultTerm = $parsedName; + |} + """.stripMargin + } else { + s""" + |$parsingCode + |$resultTpe $resultTerm = $parsedName; + """.stripMargin + } + childGen.code + castCode + + case expressions.Cast(child: Expression, DATE_TYPE_INFO) => + throw new ExpressionException("Only Long and String can be casted to Date.") + + case expressions.Cast(child: Expression, LONG_TYPE_INFO) + if child.typeInfo == BasicTypeInfo.DATE_TYPE_INFO => + val childGen = generateExpression(child) + val castCode = if (nullCheck) { + s""" + |boolean $nullTerm = ${childGen.nullTerm}; + |$resultTpe $resultTerm; + |if ($nullTerm) { + | $resultTerm = null; + |} else { + | $resultTerm = ${childGen.resultTerm}.getTime(); + |} + """.stripMargin + } else { + s""" + |$resultTerm = ${childGen.resultTerm}.getTime(); + """.stripMargin + } + childGen.code + castCode + + case expressions.Cast(child: Expression, tpe: BasicTypeInfo[_]) + if child.typeInfo == BasicTypeInfo.DATE_TYPE_INFO => + throw new ExpressionException("Date can only be casted to Long or String.") + case expressions.Cast(child: Expression, tpe: BasicTypeInfo[_]) if child.typeInfo == BasicTypeInfo.STRING_TYPE_INFO => val childGen = generateExpression(child) @@ -391,10 +544,11 @@ abstract class ExpressionCodeGenerator[R]( childCode.code + s""" |boolean $nullTerm = ${childCode.nullTerm}; + |$resultTpe $resultTerm; |if ($nullTerm) { - | ${defaultPrimitive(child.typeInfo)}; + | $resultTerm = ${defaultPrimitive(child.typeInfo)}; |} else { - | $resultTpe $resultTerm = -(${childCode.resultTerm}); + | $resultTerm = -(${childCode.resultTerm}); |} """.stripMargin } else { @@ -425,10 +579,11 @@ abstract class ExpressionCodeGenerator[R]( childCode.code + s""" |boolean $nullTerm = ${childCode.nullTerm}; + |$resultTpe $resultTerm; |if ($nullTerm) { - | ${defaultPrimitive(child.typeInfo)}; + | $resultTerm = ${defaultPrimitive(child.typeInfo)}; |} else { - | $resultTpe $resultTerm = ~((int) ${childCode.resultTerm}); + | $resultTerm = ~((int) ${childCode.resultTerm}); |} """.stripMargin } else { @@ -444,10 +599,11 @@ abstract class ExpressionCodeGenerator[R]( childCode.code + s""" |boolean $nullTerm = ${childCode.nullTerm}; + |$resultTpe $resultTerm; |if ($nullTerm) { - | ${defaultPrimitive(child.typeInfo)}; + | $resultTerm = ${defaultPrimitive(child.typeInfo)}; |} else { - | $resultTpe $resultTerm = !(${childCode.resultTerm}); + | $resultTerm = !(${childCode.resultTerm}); |} """.stripMargin } else { @@ -462,12 +618,7 @@ abstract class ExpressionCodeGenerator[R]( if (nullCheck) { childCode.code + s""" - |boolean $nullTerm = ${childCode.nullTerm}; - |if ($nullTerm) { - | ${defaultPrimitive(child.typeInfo)}; - |} else { - | $resultTpe $resultTerm = (${childCode.resultTerm}) == null; - |} + |$resultTpe $resultTerm = ${childCode.nullTerm}; """.stripMargin } else { childCode.code + @@ -481,12 +632,7 @@ abstract class ExpressionCodeGenerator[R]( if (nullCheck) { childCode.code + s""" - |boolean $nullTerm = ${childCode.nullTerm}; - |if ($nullTerm) { - | ${defaultPrimitive(child.typeInfo)}; - |} else { - | $resultTpe $resultTerm = (${childCode.resultTerm}) != null; - |} + |$resultTpe $resultTerm = !${childCode.nullTerm}; """.stripMargin } else { childCode.code + @@ -501,10 +647,11 @@ abstract class ExpressionCodeGenerator[R]( childCode.code + s""" |boolean $nullTerm = ${childCode.nullTerm}; + |$resultTpe $resultTerm; |if ($nullTerm) { - | ${defaultPrimitive(child.typeInfo)}; + | $resultTerm = ${defaultPrimitive(child.typeInfo)}; |} else { - | $resultTpe $resultTerm = Math.abs(${childCode.resultTerm}); + | $resultTerm = Math.abs(${childCode.resultTerm}); |} """.stripMargin } else { @@ -643,4 +790,37 @@ abstract class ExpressionCodeGenerator[R]( tpe.getTypeClass.getCanonicalName } + + def addDateFormatter(): Unit = { + reusableMemberStatements.add(s""" + |java.text.SimpleDateFormat dateFormatter = + | new java.text.SimpleDateFormat("yyyy-MM-dd"); + |""".stripMargin) + + reusableInitStatements.add(s""" + |dateFormatter.setTimeZone(config.getTimeZone()); + |""".stripMargin) + } + + def addTimeFormatter(): Unit = { + reusableMemberStatements.add(s""" + |java.text.SimpleDateFormat timeFormatter = + | new java.text.SimpleDateFormat("HH:mm:ss"); + |""".stripMargin) + + reusableInitStatements.add(s""" + |timeFormatter.setTimeZone(config.getTimeZone()); + |""".stripMargin) + } + + def addTimestampFormatter(): Unit = { + reusableMemberStatements.add(s""" + |java.text.SimpleDateFormat timestampFormatter = + | new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + |""".stripMargin) + + reusableInitStatements.add(s""" + |timestampFormatter.setTimeZone(config.getTimeZone()); + |""".stripMargin) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala index 06a7076..50b8c69 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala @@ -19,12 +19,12 @@ package org.apache.flink.api.table.codegen import java.io.StringReader -import org.slf4j.LoggerFactory - import org.apache.flink.api.common.functions.FilterFunction import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.TableConfig import org.apache.flink.api.table.codegen.Indenter._ import org.apache.flink.api.table.expressions.Expression +import org.slf4j.LoggerFactory /** * Code generator for a unary predicate, i.e. a Filter. @@ -32,9 +32,11 @@ import org.apache.flink.api.table.expressions.Expression class GenerateFilter[T]( inputType: CompositeType[T], predicate: Expression, - cl: ClassLoader) extends ExpressionCodeGenerator[FilterFunction[T]]( + cl: ClassLoader, + config: TableConfig) extends ExpressionCodeGenerator[FilterFunction[T]]( Seq(("in0", inputType)), - cl = cl) { + cl = cl, + config) { val LOG = LoggerFactory.getLogger(this.getClass) @@ -50,6 +52,13 @@ class GenerateFilter[T]( j""" public class $generatedName implements org.apache.flink.api.common.functions.FilterFunction<$tpe> { + + org.apache.flink.api.table.TableConfig config = null; + + public $generatedName(org.apache.flink.api.table.TableConfig config) { + this.config = config; + } + public boolean filter(Object _in0) { $tpe in0 = ($tpe) _in0; ${pred.code} @@ -65,6 +74,13 @@ class GenerateFilter[T]( j""" public class $generatedName implements org.apache.flink.api.common.functions.FilterFunction<$tpe> { + + org.apache.flink.api.table.TableConfig config = null; + + public $generatedName(org.apache.flink.api.table.TableConfig config) { + this.config = config; + } + public boolean filter(Object _in0) { $tpe in0 = ($tpe) _in0; ${pred.code} @@ -77,6 +93,7 @@ class GenerateFilter[T]( LOG.debug(s"""Generated unary predicate "$predicate":\n$code""") compiler.cook(new StringReader(code)) val clazz = compiler.getClassLoader().loadClass(generatedName) - clazz.newInstance().asInstanceOf[FilterFunction[T]] + val constructor = clazz.getConstructor(classOf[TableConfig]) + constructor.newInstance(config).asInstanceOf[FilterFunction[T]] } } http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala index 8c0cec3..b706e6d 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala @@ -19,12 +19,12 @@ package org.apache.flink.api.table.codegen import java.io.StringReader -import org.slf4j.LoggerFactory - import org.apache.flink.api.common.functions.FlatJoinFunction import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.TableConfig import org.apache.flink.api.table.codegen.Indenter._ import org.apache.flink.api.table.expressions.{Expression, NopExpression} +import org.slf4j.LoggerFactory /** * Code generator for assembling the result of a binary operation. @@ -35,10 +35,12 @@ class GenerateJoin[L, R, O]( resultTypeInfo: CompositeType[O], predicate: Expression, outputFields: Seq[Expression], - cl: ClassLoader) + cl: ClassLoader, + config: TableConfig) extends GenerateResultAssembler[FlatJoinFunction[L, R, O]]( Seq(("in0", leftTypeInfo), ("in1", rightTypeInfo)), - cl = cl) { + cl = cl, + config) { val LOG = LoggerFactory.getLogger(this.getClass) @@ -66,6 +68,11 @@ class GenerateJoin[L, R, O]( ${reuseCode(resultTypeInfo)} + public org.apache.flink.api.table.TableConfig config = null; + public $generatedName(org.apache.flink.api.table.TableConfig config) { + this.config = config; + } + public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) { $leftTpe in0 = ($leftTpe) _in0; $rightTpe in1 = ($rightTpe) _in1; @@ -81,6 +88,11 @@ class GenerateJoin[L, R, O]( ${reuseCode(resultTypeInfo)} + public org.apache.flink.api.table.TableConfig config = null; + public $generatedName(org.apache.flink.api.table.TableConfig config) { + this.config = config; + } + public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) { $leftTpe in0 = ($leftTpe) _in0; $rightTpe in1 = ($rightTpe) _in1; @@ -102,6 +114,13 @@ class GenerateJoin[L, R, O]( ${reuseCode(resultTypeInfo)} + org.apache.flink.api.table.TableConfig config = null; + + public $generatedName(org.apache.flink.api.table.TableConfig config) { + this.config = config; + ${reuseInitCode()} + } + public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) { $leftTpe in0 = ($leftTpe) _in0; $rightTpe in1 = ($rightTpe) _in1; @@ -121,6 +140,13 @@ class GenerateJoin[L, R, O]( ${reuseCode(resultTypeInfo)} + org.apache.flink.api.table.TableConfig config = null; + + public $generatedName(org.apache.flink.api.table.TableConfig config) { + this.config = config; + ${reuseInitCode()} + } + public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) { $leftTpe in0 = ($leftTpe) _in0; $rightTpe in1 = ($rightTpe) _in1; @@ -139,6 +165,7 @@ class GenerateJoin[L, R, O]( LOG.debug(s"""Generated join:\n$code""") compiler.cook(new StringReader(code)) val clazz = compiler.getClassLoader().loadClass(generatedName) - clazz.newInstance().asInstanceOf[FlatJoinFunction[L, R, O]] + val constructor = clazz.getConstructor(classOf[TableConfig]) + constructor.newInstance(config).asInstanceOf[FlatJoinFunction[L, R, O]] } } http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala index 5172eab..3916410 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala @@ -20,6 +20,7 @@ package org.apache.flink.api.table.codegen import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.api.table.TableConfig import org.apache.flink.api.table.expressions.Expression import org.apache.flink.api.table.typeinfo.RowTypeInfo @@ -28,16 +29,20 @@ import org.apache.flink.api.table.typeinfo.RowTypeInfo */ abstract class GenerateResultAssembler[R]( inputs: Seq[(String, CompositeType[_])], - cl: ClassLoader) - extends ExpressionCodeGenerator[R](inputs, cl = cl) { + cl: ClassLoader, + config: TableConfig) + extends ExpressionCodeGenerator[R](inputs, cl = cl, config) { def reuseCode[A](resultTypeInfo: CompositeType[A]) = { val resultTpe = typeTermForTypeInfo(resultTypeInfo) resultTypeInfo match { - case pj: PojoTypeInfo[_] => s"$resultTpe out = new ${pj.getTypeClass.getCanonicalName}();" + case pj: PojoTypeInfo[_] => + super.reuseMemberCode() + + s"$resultTpe out = new ${pj.getTypeClass.getCanonicalName}();" case row: RowTypeInfo => - s"org.apache.flink.api.table.Row out =" + + super.reuseMemberCode() + + s"org.apache.flink.api.table.Row out =" + s" new org.apache.flink.api.table.Row(${row.getArity});" case _ => "" http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala index 5941662..a75d15b 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala @@ -19,12 +19,12 @@ package org.apache.flink.api.table.codegen import java.io.StringReader -import org.slf4j.LoggerFactory - import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.TableConfig import org.apache.flink.api.table.codegen.Indenter._ import org.apache.flink.api.table.expressions.Expression +import org.slf4j.LoggerFactory /** * Code generator for assembling the result of a select operation. @@ -33,10 +33,12 @@ class GenerateSelect[I, O]( inputTypeInfo: CompositeType[I], resultTypeInfo: CompositeType[O], outputFields: Seq[Expression], - cl: ClassLoader) + cl: ClassLoader, + config: TableConfig) extends GenerateResultAssembler[MapFunction[I, O]]( Seq(("in0", inputTypeInfo)), - cl = cl) { + cl = cl, + config) { val LOG = LoggerFactory.getLogger(this.getClass) @@ -58,6 +60,13 @@ class GenerateSelect[I, O]( ${reuseCode(resultTypeInfo)} + org.apache.flink.api.table.TableConfig config = null; + + public $generatedName(org.apache.flink.api.table.TableConfig config) { + this.config = config; + ${reuseInitCode()} + } + @Override public Object map(Object _in0) { $inputTpe in0 = ($inputTpe) _in0; @@ -69,6 +78,7 @@ class GenerateSelect[I, O]( LOG.debug(s"""Generated select:\n$code""") compiler.cook(new StringReader(code)) val clazz = compiler.getClassLoader().loadClass(generatedName) - clazz.newInstance().asInstanceOf[MapFunction[I, O]] + val constructor = clazz.getConstructor(classOf[TableConfig]) + constructor.newInstance(config).asInstanceOf[MapFunction[I, O]] } } http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala index 5654649..f909cab 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.table.expressions +import java.util.Date import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.scala.table.ImplicitExpressionOperations @@ -28,6 +29,7 @@ object Literal { case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO) case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO) case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO) + case date: Date => Literal(date, BasicTypeInfo.DATE_TYPE_INFO) } } http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala index 075f070..2cbd8fa 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala @@ -117,7 +117,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { atom <~ ".cast(DOUBLE)" ^^ { e => Cast(e, BasicTypeInfo.DOUBLE_TYPE_INFO) } | atom <~ ".cast(BOOL)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } | atom <~ ".cast(BOOLEAN)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } | - atom <~ ".cast(STRING)" ^^ { e => Cast(e, BasicTypeInfo.STRING_TYPE_INFO) } + atom <~ ".cast(STRING)" ^^ { e => Cast(e, BasicTypeInfo.STRING_TYPE_INFO) } | + atom <~ ".cast(DATE)" ^^ { e => Cast(e, BasicTypeInfo.DATE_TYPE_INFO) } lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ ")" ^^ { case e ~ _ ~ as ~ _ => Naming(e, as.name) http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala index f1ba847..4e50272 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala @@ -19,6 +19,7 @@ package org.apache.flink.api.table.runtime import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction} import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.TableConfig import org.apache.flink.api.table.codegen.GenerateFilter import org.apache.flink.api.table.expressions.Expression import org.apache.flink.configuration.Configuration @@ -29,16 +30,18 @@ import org.apache.flink.configuration.Configuration */ class ExpressionFilterFunction[T]( predicate: Expression, - inputType: CompositeType[T]) extends RichFilterFunction[T] { + inputType: CompositeType[T], + config: TableConfig = TableConfig.DEFAULT) extends RichFilterFunction[T] { var compiledFilter: FilterFunction[T] = null - override def open(config: Configuration): Unit = { + override def open(c: Configuration): Unit = { if (compiledFilter == null) { val codegen = new GenerateFilter[T]( inputType, predicate, - getRuntimeContext.getUserCodeClassLoader) + getRuntimeContext.getUserCodeClassLoader, + config) compiledFilter = codegen.generate() } } http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala index 5743211..cf2c90f 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala @@ -19,6 +19,7 @@ package org.apache.flink.api.table.runtime import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatJoinFunction} import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.TableConfig import org.apache.flink.api.table.codegen.GenerateJoin import org.apache.flink.api.table.expressions.Expression import org.apache.flink.configuration.Configuration @@ -33,18 +34,20 @@ class ExpressionJoinFunction[L, R, O]( leftType: CompositeType[L], rightType: CompositeType[R], resultType: CompositeType[O], - outputFields: Seq[Expression]) extends RichFlatJoinFunction[L, R, O] { + outputFields: Seq[Expression], + config: TableConfig = TableConfig.DEFAULT) extends RichFlatJoinFunction[L, R, O] { var compiledJoin: FlatJoinFunction[L, R, O] = null - override def open(config: Configuration): Unit = { + override def open(c: Configuration): Unit = { val codegen = new GenerateJoin[L, R, O]( leftType, rightType, resultType, predicate, outputFields, - getRuntimeContext.getUserCodeClassLoader) + getRuntimeContext.getUserCodeClassLoader, + config) compiledJoin = codegen.generate() } http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala index 32098c3..ab7adb1 100644 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.table.runtime +import org.apache.flink.api.table.TableConfig import org.apache.flink.api.table.expressions.Expression import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} import org.apache.flink.api.common.typeutils.CompositeType @@ -30,18 +31,20 @@ import org.apache.flink.configuration.Configuration class ExpressionSelectFunction[I, O]( inputType: CompositeType[I], resultType: CompositeType[O], - outputFields: Seq[Expression]) extends RichMapFunction[I, O] { + outputFields: Seq[Expression], + config: TableConfig = TableConfig.DEFAULT) extends RichMapFunction[I, O] { var compiledSelect: MapFunction[I, O] = null - override def open(config: Configuration): Unit = { + override def open(c: Configuration): Unit = { if (compiledSelect == null) { val resultCodegen = new GenerateSelect[I, O]( inputType, resultType, outputFields, - getRuntimeContext.getUserCodeClassLoader) + getRuntimeContext.getUserCodeClassLoader, + config) compiledSelect = resultCodegen.generate() } http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java index aa4ef2a..6a83d17 100644 --- a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java +++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.api.java.table.test; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.table.Table; import org.apache.flink.api.table.Row; import org.apache.flink.api.java.DataSet; @@ -122,5 +123,27 @@ public class CastingITCase extends MultipleProgramsTestBase { String expected = "1,1,1,1,2.0,2.0,true\n"; compareResultAsText(results, expected); } + + @Test + public void testCastDateFromString() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = new TableEnvironment(); + + DataSource<Tuple4<String, String, String, String>> input = + env.fromElements(new Tuple4<>("2011-05-03", "15:51:36", "2011-05-03 15:51:36.000", "1446473775")); + + Table table = + tableEnv.fromDataSet(input); + + Table result = table + .select("f0.cast(DATE) AS f0, f1.cast(DATE) AS f1, f2.cast(DATE) AS f2, f3.cast(DATE) AS f3") + .select("f0.cast(STRING), f1.cast(STRING), f2.cast(STRING), f3.cast(STRING)"); + + DataSet<Row> ds = tableEnv.toDataSet(result, Row.class); + List<Row> results = ds.collect(); + String expected = "2011-05-03 00:00:00.000,1970-01-01 15:51:36.000,2011-05-03 15:51:36.000," + + "1970-01-17 17:47:53.775\n"; + compareResultAsText(results, expected); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala index 9c58b64..77fd7b7 100644 --- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala +++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala @@ -18,6 +18,8 @@ package org.apache.flink.api.scala.table.test +import java.util.Date + import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -38,10 +40,10 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo def testAutoCastToString(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toTable - .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d") + val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, new Date(0))).toTable + .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d", '_7 + "Date") .toDataSet[Row] - val expected = "1b,1s,1i,1L,1.0f,1.0d" + val expected = "1b,1s,1i,1L,1.0f,1.0d,1970-01-01 00:00:00.000Date" val results = ds.collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } @@ -80,7 +82,9 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo def testCastFromString: Unit = { val env = ExecutionEnvironment.getExecutionEnvironment - val ds = env.fromElements(("1", "true", "2.0")).toTable + val ds = env.fromElements(("1", "true", "2.0", + "2011-05-03", "15:51:36", "2011-05-03 15:51:36.000", "1446473775")) + .toTable .select( '_1.cast(BasicTypeInfo.BYTE_TYPE_INFO), '_1.cast(BasicTypeInfo.SHORT_TYPE_INFO), @@ -88,9 +92,15 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo '_1.cast(BasicTypeInfo.LONG_TYPE_INFO), '_3.cast(BasicTypeInfo.DOUBLE_TYPE_INFO), '_3.cast(BasicTypeInfo.FLOAT_TYPE_INFO), - '_2.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO)) + '_2.cast(BasicTypeInfo.BOOLEAN_TYPE_INFO), + '_4.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO), + '_5.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO), + '_6.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO), + '_7.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO)) .toDataSet[Row] - val expected = "1,1,1,1,2.0,2.0,true\n" + val expected = "1,1,1,1,2.0,2.0,true," + + "2011-05-03 00:00:00.000,1970-01-01 15:51:36.000,2011-05-03 15:51:36.000," + + "1970-01-17 17:47:53.775\n" val results = ds.collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } http://git-wip-us.apache.org/repos/asf/flink/blob/31a2de86/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala index f5d7a4d..017cbf1 100644 --- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala +++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala @@ -18,6 +18,10 @@ package org.apache.flink.api.scala.table.test +import java.util.Date + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.table.expressions.Literal import org.apache.flink.api.table.{Row, ExpressionException} import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ @@ -113,4 +117,18 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Test + def testDateLiteral(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + + val ds = env.fromElements((0L, "test")).as('a, 'b) + .select('a, + Literal(new Date(0)).cast(BasicTypeInfo.STRING_TYPE_INFO), + 'a.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO)) + .toDataSet[Row] + val expected = "0,1970-01-01 00:00:00.000,1970-01-01 00:00:00.000" + val results = ds.collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + }