[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r459194715 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala ## @@ -87,17 +181,72 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } } + + private lazy val outputFieldWriters: Seq[String => Any] = output.map { attr => +val converter = CatalystTypeConverters.createToCatalystConverter(attr.dataType) +attr.dataType match { + case StringType => wrapperConvertException(data => data, converter) + case BooleanType => wrapperConvertException(data => data.toBoolean, converter) + case ByteType => wrapperConvertException(data => data.toByte, converter) + case BinaryType => +wrapperConvertException(data => UTF8String.fromString(data).getBytes, converter) + case IntegerType => wrapperConvertException(data => data.toInt, converter) + case ShortType => wrapperConvertException(data => data.toShort, converter) + case LongType => wrapperConvertException(data => data.toLong, converter) + case FloatType => wrapperConvertException(data => data.toFloat, converter) + case DoubleType => wrapperConvertException(data => data.toDouble, converter) + case _: DecimalType => wrapperConvertException(data => BigDecimal(data), converter) + case DateType if conf.datetimeJava8ApiEnabled => +wrapperConvertException(data => DateTimeUtils.stringToDate( + UTF8String.fromString(data), + DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) + .map(DateTimeUtils.daysToLocalDate).orNull, converter) + case DateType => wrapperConvertException(data => DateTimeUtils.stringToDate( +UTF8String.fromString(data), +DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) +.map(DateTimeUtils.toJavaDate).orNull, converter) + case TimestampType if conf.datetimeJava8ApiEnabled => + wrapperConvertException(data => DateTimeUtils.stringToTimestamp( + UTF8String.fromString(data), + DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) + .map(DateTimeUtils.microsToInstant).orNull, converter) + case TimestampType => wrapperConvertException(data => DateTimeUtils.stringToTimestamp( +UTF8String.fromString(data), +DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) +.map(DateTimeUtils.toJavaTimestamp).orNull, converter) + case CalendarIntervalType => wrapperConvertException( +data => IntervalUtils.stringToInterval(UTF8String.fromString(data)), +converter) + case udt: UserDefinedType[_] => +wrapperConvertException(data => udt.deserialize(data), converter) + case dt => +throw new SparkException("TRANSFORM without serde does not support " + Review comment: > nit: `TRANSFORM` -> `s"$nodeName...` Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r459189448 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala ## @@ -87,17 +181,72 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } } + + private lazy val outputFieldWriters: Seq[String => Any] = output.map { attr => +val converter = CatalystTypeConverters.createToCatalystConverter(attr.dataType) +attr.dataType match { + case StringType => wrapperConvertException(data => data, converter) + case BooleanType => wrapperConvertException(data => data.toBoolean, converter) + case ByteType => wrapperConvertException(data => data.toByte, converter) + case BinaryType => +wrapperConvertException(data => UTF8String.fromString(data).getBytes, converter) + case IntegerType => wrapperConvertException(data => data.toInt, converter) + case ShortType => wrapperConvertException(data => data.toShort, converter) + case LongType => wrapperConvertException(data => data.toLong, converter) + case FloatType => wrapperConvertException(data => data.toFloat, converter) + case DoubleType => wrapperConvertException(data => data.toDouble, converter) + case _: DecimalType => wrapperConvertException(data => BigDecimal(data), converter) + case DateType if conf.datetimeJava8ApiEnabled => +wrapperConvertException(data => DateTimeUtils.stringToDate( + UTF8String.fromString(data), + DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) + .map(DateTimeUtils.daysToLocalDate).orNull, converter) + case DateType => wrapperConvertException(data => DateTimeUtils.stringToDate( +UTF8String.fromString(data), +DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) +.map(DateTimeUtils.toJavaDate).orNull, converter) + case TimestampType if conf.datetimeJava8ApiEnabled => + wrapperConvertException(data => DateTimeUtils.stringToTimestamp( + UTF8String.fromString(data), + DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) + .map(DateTimeUtils.microsToInstant).orNull, converter) + case TimestampType => wrapperConvertException(data => DateTimeUtils.stringToTimestamp( +UTF8String.fromString(data), +DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) +.map(DateTimeUtils.toJavaTimestamp).orNull, converter) + case CalendarIntervalType => wrapperConvertException( +data => IntervalUtils.stringToInterval(UTF8String.fromString(data)), +converter) + case udt: UserDefinedType[_] => +wrapperConvertException(data => udt.deserialize(data), converter) + case dt => +throw new SparkException("TRANSFORM without serde does not support " + + s"${dt.getClass.getSimpleName} as output data type") Review comment: > `dt.getClass.getSimpleName` -> `dt.catalogString` It is not general, for ArrayType will show `array`, StructType will show `struct` WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r459189448 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala ## @@ -87,17 +181,72 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } } + + private lazy val outputFieldWriters: Seq[String => Any] = output.map { attr => +val converter = CatalystTypeConverters.createToCatalystConverter(attr.dataType) +attr.dataType match { + case StringType => wrapperConvertException(data => data, converter) + case BooleanType => wrapperConvertException(data => data.toBoolean, converter) + case ByteType => wrapperConvertException(data => data.toByte, converter) + case BinaryType => +wrapperConvertException(data => UTF8String.fromString(data).getBytes, converter) + case IntegerType => wrapperConvertException(data => data.toInt, converter) + case ShortType => wrapperConvertException(data => data.toShort, converter) + case LongType => wrapperConvertException(data => data.toLong, converter) + case FloatType => wrapperConvertException(data => data.toFloat, converter) + case DoubleType => wrapperConvertException(data => data.toDouble, converter) + case _: DecimalType => wrapperConvertException(data => BigDecimal(data), converter) + case DateType if conf.datetimeJava8ApiEnabled => +wrapperConvertException(data => DateTimeUtils.stringToDate( + UTF8String.fromString(data), + DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) + .map(DateTimeUtils.daysToLocalDate).orNull, converter) + case DateType => wrapperConvertException(data => DateTimeUtils.stringToDate( +UTF8String.fromString(data), +DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) +.map(DateTimeUtils.toJavaDate).orNull, converter) + case TimestampType if conf.datetimeJava8ApiEnabled => + wrapperConvertException(data => DateTimeUtils.stringToTimestamp( + UTF8String.fromString(data), + DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) + .map(DateTimeUtils.microsToInstant).orNull, converter) + case TimestampType => wrapperConvertException(data => DateTimeUtils.stringToTimestamp( +UTF8String.fromString(data), +DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) +.map(DateTimeUtils.toJavaTimestamp).orNull, converter) + case CalendarIntervalType => wrapperConvertException( +data => IntervalUtils.stringToInterval(UTF8String.fromString(data)), +converter) + case udt: UserDefinedType[_] => +wrapperConvertException(data => udt.deserialize(data), converter) + case dt => +throw new SparkException("TRANSFORM without serde does not support " + + s"${dt.getClass.getSimpleName} as output data type") Review comment: > `dt.getClass.getSimpleName` -> `dt.catalogString` It is not general, for ArrayType will show `array`, StructType will show `struct` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r459189586 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala ## @@ -1063,6 +1063,9 @@ private[hive] trait HiveInspectors { case DateType => dateTypeInfo case TimestampType => timestampTypeInfo case NullType => voidTypeInfo + case dt => +throw new AnalysisException("HiveInspectors does not support convert " + Review comment: > nit: `s"${dt.catalogString}" cannot be converted to Hive TypeInfo"` same reason like https://github.com/apache/spark/pull/29085#discussion_r459189448 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r459182797 ## File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala ## @@ -1031,4 +1031,96 @@ class PlanParserSuite extends AnalysisTest { assertEqual("select a, b from db.c;;;", table("db", "c").select('a, 'b)) assertEqual("select a, b from db.c; ;; ;", table("db", "c").select('a, 'b)) } + + test("SPARK-32106: TRANSFORM without serde") { Review comment: > Also, could you check `ROW FORMAT SERDE`, too? Add UT This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r459182038 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala ## @@ -54,87 +52,110 @@ case class HiveScriptTransformationExec( script: String, output: Seq[Attribute], child: SparkPlan, -ioschema: HiveScriptIOSchema) - extends BaseScriptTransformationExec { +ioschema: ScriptTransformationIOSchema) + extends BaseScriptTransformationExec with HiveInspectors { - override def processIterator( - inputIterator: Iterator[InternalRow], - hadoopConf: Configuration): Iterator[InternalRow] = { -val cmd = List("/bin/bash", "-c", script) -val builder = new ProcessBuilder(cmd.asJava) - -val proc = builder.start() -val inputStream = proc.getInputStream -val outputStream = proc.getOutputStream -val errorStream = proc.getErrorStream - -// In order to avoid deadlocks, we need to consume the error output of the child process. -// To avoid issues caused by large error output, we use a circular buffer to limit the amount -// of error output that we retain. See SPARK-7862 for more discussion of the deadlock / hang -// that motivates this. -val stderrBuffer = new CircularBuffer(2048) -new RedirectThread( - errorStream, - stderrBuffer, - "Thread-ScriptTransformation-STDERR-Consumer").start() + private def initInputSerDe( + input: Seq[Expression]): Option[(AbstractSerDe, StructObjectInspector)] = { +ioschema.inputSerdeClass.map { serdeClass => + val (columns, columnTypes) = parseAttrs(input) + val serde = initSerDe(serdeClass, columns, columnTypes, ioschema.inputSerdeProps) + val fieldObjectInspectors = columnTypes.map(toInspector) + val objectInspector = ObjectInspectorFactory +.getStandardStructObjectInspector(columns.asJava, fieldObjectInspectors.asJava) + (serde, objectInspector) +} + } -val outputProjection = new InterpretedProjection(input, child.output) + private def initOutputSerDe( + output: Seq[Attribute]): Option[(AbstractSerDe, StructObjectInspector)] = { +ioschema.outputSerdeClass.map { serdeClass => + val (columns, columnTypes) = parseAttrs(output) + val serde = initSerDe(serdeClass, columns, columnTypes, ioschema.outputSerdeProps) + val structObjectInspector = serde.getObjectInspector().asInstanceOf[StructObjectInspector] + (serde, structObjectInspector) +} + } -// This nullability is a performance optimization in order to avoid an Option.foreach() call -// inside of a loop -@Nullable val (inputSerde, inputSoi) = ioschema.initInputSerDe(input).getOrElse((null, null)) + private def parseAttrs(attrs: Seq[Expression]): (Seq[String], Seq[DataType]) = { +val columns = attrs.zipWithIndex.map(e => s"${e._1.prettyName}_${e._2}") +val columnTypes = attrs.map(_.dataType) +(columns, columnTypes) + } -// This new thread will consume the ScriptTransformation's input rows and write them to the -// external process. That process's output will be read by this current thread. -val writerThread = new HiveScriptTransformationWriterThread( - inputIterator.map(outputProjection), - input.map(_.dataType), - inputSerde, - inputSoi, - ioschema, - outputStream, - proc, - stderrBuffer, - TaskContext.get(), - hadoopConf -) + private def initSerDe( Review comment: > Sorry for the confusion, but, on second thought, its better to pull out hive-serde related functions from `HiveScriptTransformationExec` then create a companion object having them for readability [maropu@972775b](https://github.com/maropu/spark/commit/972775b821406d81d3c1ba1c718de3037a0ca068). WDTY? Agree, make ScripTransformExec only handle data process. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r459178460 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ## @@ -532,6 +532,21 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } + object SparkScripts extends Strategy { +def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case logical.ScriptTransformation(input, script, output, child, ioschema) +if ioschema.inputSerdeClass.isEmpty && ioschema.outputSerdeClass.isEmpty => Review comment: > We need to check this here? Seems like it has been checked in https://github.com/apache/spark/pull/29085/files#diff-9847f5cef7cf7fbc5830fbc6b779ee10R783-R784 ? Yea, don't need now This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r459175410 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala ## @@ -1063,6 +1063,9 @@ private[hive] trait HiveInspectors { case DateType => dateTypeInfo case TimestampType => timestampTypeInfo case NullType => voidTypeInfo + case dt => +throw new AnalysisException("TRANSFORM with hive serde does not support " + Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r459175002 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.{SparkException, TestUtils} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.test.SharedSparkSession + +class SparkScriptTransformationSuite extends BaseScriptTransformationSuite with SharedSparkSession { + import testImplicits._ + + override def isHive23OrSpark: Boolean = true + + override def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec = { +SparkScriptTransformationExec( + input = input, + script = script, + output = output, + child = child, + ioschema = ioschema +) + } + + test("SPARK-32106: TRANSFORM with serde without hive should throw exception") { +assume(TestUtils.testCommandAvailable("/bin/bash")) +withTempView("v") { + val df = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") + df.createTempView("v") + + val e = intercept[ParseException] { +sql( + """ +|SELECT TRANSFORM (a) +|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' +|USING 'cat' AS (a) +|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' +|FROM v + """.stripMargin) + }.getMessage + assert(e.contains("TRANSFORM with serde is only supported in hive mode")) +} + } + + test("TRANSFORM doesn't support ArrayType/MapType/StructType as output data type (no serde)") { Review comment: DOne This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r459174831 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala ## @@ -206,75 +169,147 @@ class HiveScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with val query = sql( s""" - |SELECT - |TRANSFORM(a, b, c, d, e) - |USING 'python $scriptFilePath' AS (a, b, c, d, e) - |FROM v + |SELECT TRANSFORM(a, b, c, d, e) + |USING 'python ${scriptFilePath}' + |FROM v """.stripMargin) - // In Hive 1.2, the string representation of a decimal omits trailing zeroes. - // But in Hive 2.3, it is always padded to 18 digits with trailing zeroes if necessary. - val decimalToString: Column => Column = if (HiveUtils.isHive23) { -c => c.cast("string") - } else { -c => c.cast("decimal(1, 0)").cast("string") - } - checkAnswer(query, identity, df.select( -'a.cast("string"), -'b.cast("string"), -'c.cast("string"), -decimalToString('d), -'e.cast("string")).collect()) + // In hive default serde mode, if we don't define output schema, it will choose first + // two column as output schema (key: String, value: String) + checkAnswer( +query, +identity, +df.select( + 'a.cast("string").as("key"), + 'b.cast("string").as("value")).collect()) } } - test("SPARK-30973: TRANSFORM should wait for the termination of the script (no serde)") { + testBasicInputDataTypesWith(hiveIOSchema, "hive serde") + + test("SPARK-32106: TRANSFORM supports complex data types type (hive serde)") { assume(TestUtils.testCommandAvailable("/bin/bash")) +withTempView("v") { + val df = Seq( +(1, "1", Array(0, 1, 2), Map("a" -> 1)), +(2, "2", Array(3, 4, 5), Map("b" -> 2))) +.toDF("a", "b", "c", "d") + .select('a, 'b, 'c, 'd, struct('a, 'b).as("e")) + df.createTempView("v") -val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") -val e = intercept[SparkException] { - val plan = -new HiveScriptTransformationExec( - input = Seq(rowsDf.col("a").expr), - script = "some_non_existent_command", - output = Seq(AttributeReference("a", StringType)()), - child = rowsDf.queryExecution.sparkPlan, - ioschema = noSerdeIOSchema) - SparkPlanTest.executePlan(plan, hiveContext) + // Hive serde support ArrayType/MapType/StructType as input and output data type + checkAnswer( +df, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq( +df.col("c").expr, +df.col("d").expr, +df.col("e").expr), + script = "cat", + output = Seq( +AttributeReference("c", ArrayType(IntegerType))(), +AttributeReference("d", MapType(StringType, IntegerType))(), +AttributeReference("e", StructType( + Seq( +StructField("col1", IntegerType, false), +StructField("col2", StringType, true()), + child = child, + ioschema = hiveIOSchema +), +df.select('c, 'd, 'e).collect()) } -assert(e.getMessage.contains("Subprocess exited with status")) -assert(uncaughtExceptionHandler.exception.isEmpty) } - test("SPARK-30973: TRANSFORM should wait for the termination of the script (with serde)") { + test("SPARK-32106: TRANSFORM supports complex data types end to end (hive serde) ") { Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r459174654 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala ## @@ -33,14 +32,13 @@ import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.io.Writable import org.apache.spark.TaskContext -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive.HiveInspectors import org.apache.spark.sql.hive.HiveShim._ -import org.apache.spark.sql.types.DataType -import org.apache.spark.util.{CircularBuffer, RedirectThread, Utils} +import org.apache.spark.sql.types.{DataType, StringType} Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r459173720 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,146 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), 1, smallint(100), bigint(1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), 2, smallint(200), bigint(2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), 3, smallint(300), bigint(3), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +AS t(a, b, c, d, e, f, g, h, i, j, k, l); + +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t; + +-- with non-exist command +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t; + +-- with non-exist file +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t; + +-- common supported data types between no serde and serde transform +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) +USING 'cat' AS ( +a string, +b boolean, +c binary, +d tinyint, +e int, +f smallint, +g long, +h float, +i double, +j decimal(38, 18), +k timestamp, +l date) +FROM t +) tmp; + +-- common supported data types between no serde and serde transform +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) +USING 'cat' AS ( +a string, +b string, +c string, +d string, +e string, +f string, +g string, +h string, +i string, +j string, +k string, +l string) +FROM t +) tmp; + +-- SPARK-32388 handle schema less +SELECT TRANSFORM(a) +USING 'cat' +FROM t; + +SELECT TRANSFORM(a, b) +USING 'cat' +FROM t; + +SELECT TRANSFORM(a, b, c) +USING 'cat' +FROM t; + +-- return null when return string incompatible (no serde) +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i) +USING 'cat' AS (a int, b short, c long, d byte, e float, f double, g decimal(38, 18), h date, i timestamp) +FROM VALUES +('a','','1231a','a','213.21a','213.21a','0a.21d','2000-04-01123','1997-0102 00:00:') tmp(a, b, c, d, e, f, g, h, i); + +-- SPARK-28227: transform can't run with aggregation +SELECT TRANSFORM(b, max(a), sum(f)) +USING 'cat' AS (a, b) +FROM t +GROUP BY b; + +-- transform use MAP +MAP a, b USING 'cat' AS (a, b) FROM t; + +-- transform use REDUCE +REDUCE a, b USING 'cat' AS (a, b) FROM t; + +-- transform with defined row format delimit +SELECT TRANSFORM(a, b, c, null) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +USING 'cat' AS (a, b, c, d) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +FROM t; + + +SELECT TRANSFORM(a, b, c, null) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +USING 'cat' AS (d) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '||' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +FROM t; + +-- SPARK-31937 transform with defined row format delimit Review comment: > This JIRA is related to this query? I read it though, I'm not sure about the relationship. What kind of exceptions does this query throws? Test for support Array/Map/Struct Remove now and add it in that pr: This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r459173424 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,146 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), 1, smallint(100), bigint(1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), 2, smallint(200), bigint(2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), 3, smallint(300), bigint(3), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +AS t(a, b, c, d, e, f, g, h, i, j, k, l); + +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t; + +-- with non-exist command +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t; + +-- with non-exist file +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t; + +-- common supported data types between no serde and serde transform +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) +USING 'cat' AS ( +a string, +b boolean, +c binary, +d tinyint, +e int, +f smallint, +g long, +h float, +i double, +j decimal(38, 18), +k timestamp, +l date) +FROM t +) tmp; + +-- common supported data types between no serde and serde transform +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) +USING 'cat' AS ( +a string, +b string, +c string, +d string, +e string, +f string, +g string, +h string, +i string, +j string, +k string, +l string) +FROM t +) tmp; + +-- SPARK-32388 handle schema less +SELECT TRANSFORM(a) +USING 'cat' +FROM t; + +SELECT TRANSFORM(a, b) +USING 'cat' +FROM t; + +SELECT TRANSFORM(a, b, c) +USING 'cat' +FROM t; + +-- return null when return string incompatible (no serde) +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i) +USING 'cat' AS (a int, b short, c long, d byte, e float, f double, g decimal(38, 18), h date, i timestamp) +FROM VALUES +('a','','1231a','a','213.21a','213.21a','0a.21d','2000-04-01123','1997-0102 00:00:') tmp(a, b, c, d, e, f, g, h, i); + +-- SPARK-28227: transform can't run with aggregation +SELECT TRANSFORM(b, max(a), sum(f)) +USING 'cat' AS (a, b) +FROM t +GROUP BY b; + +-- transform use MAP +MAP a, b USING 'cat' AS (a, b) FROM t; + +-- transform use REDUCE +REDUCE a, b USING 'cat' AS (a, b) FROM t; + +-- transform with defined row format delimit +SELECT TRANSFORM(a, b, c, null) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +USING 'cat' AS (a, b, c, d) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +FROM t; + + Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r459172282 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala ## @@ -744,8 +744,29 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging selectClause.hints.asScala.foldRight(withWindow)(withHints) } + // Decode and input/output format. + type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458829828 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ## @@ -679,64 +680,78 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { "Unsupported operation: Used defined record reader/writer classes.", ctx) } -// Decode and input/output format. -type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) -def format( -fmt: RowFormatContext, -configKey: String, -defaultConfigValue: String): Format = fmt match { - case c: RowFormatDelimitedContext => -// TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema -// expects a seq of pairs in which the old parsers' token names are used as keys. -// Transforming the result of visitRowFormatDelimited would be quite a bit messier than -// retrieving the key value pairs ourselves. -def entry(key: String, value: Token): Seq[(String, String)] = { - Option(value).map(t => key -> t.getText).toSeq -} -val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++ - entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++ - entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++ - entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++ - entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs) - -(entries, None, Seq.empty, None) +if (!conf.getConf(CATALOG_IMPLEMENTATION).equals("hive")) { + super.withScriptIOSchema( +ctx, +inRowFormat, +recordWriter, +outRowFormat, +recordReader, +schemaLess) +} else { - case c: RowFormatSerdeContext => -// Use a serde format. -val CatalogStorageFormat(None, None, None, Some(name), _, props) = visitRowFormatSerde(c) + // Decode and input/output format. + type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) + + def format( + fmt: RowFormatContext, + configKey: String, + defaultConfigValue: String): Format = fmt match { +case c: RowFormatDelimitedContext => + // TODO we should use visitRowFormatDelimited function here. However HiveScriptIOSchema + // expects a seq of pairs in which the old parsers' token names are used as keys. + // Transforming the result of visitRowFormatDelimited would be quite a bit messier than + // retrieving the key value pairs ourselves. + def entry(key: String, value: Token): Seq[(String, String)] = { +Option(value).map(t => key -> t.getText).toSeq + } + + val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++ +entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++ +entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++ +entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++ +entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs) Review comment: > hm... how about this? [maropu@7954d0a](https://github.com/maropu/spark/commit/7954d0a075b1f43f5e36e27ceb526e65d8df2abc) Extract function is ok but `getOrElse` not elegant ``` protect def getRowFormatDelimited(ctx: RowFormatDelimitedContext): Format = { def entry(key: String, value: Token): Seq[(String, String)] = { Option(value).map(t => key -> t.getText).toSeq } val entries = entry("TOK_TABLEROWFORMATFIELD", ctx.fieldsTerminatedBy) ++ entry("TOK_TABLEROWFORMATCOLLITEMS", ctx.collectionItemsTerminatedBy) ++ entry("TOK_TABLEROWFORMATMAPKEYS", ctx.keysTerminatedBy) ++ entry("TOK_TABLEROWFORMATLINES", ctx.linesSeparatedBy) ++ entry("TOK_TABLEROWFORMATNULL", ctx.nullDefinedAs) (entries, None, Seq.empty, None) } ``` use getOrElse is not elagant This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458822655 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala ## @@ -745,7 +745,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Create a (Hive based) [[ScriptInputOutputSchema]]. + * Create a [[ScriptInputOutputSchema]]. */ protected def withScriptIOSchema( Review comment: > Plz add tests in `PlanParserSuite` Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458804197 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ## @@ -679,64 +680,78 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { "Unsupported operation: Used defined record reader/writer classes.", ctx) } -// Decode and input/output format. -type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) -def format( -fmt: RowFormatContext, -configKey: String, -defaultConfigValue: String): Format = fmt match { - case c: RowFormatDelimitedContext => -// TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema -// expects a seq of pairs in which the old parsers' token names are used as keys. -// Transforming the result of visitRowFormatDelimited would be quite a bit messier than -// retrieving the key value pairs ourselves. -def entry(key: String, value: Token): Seq[(String, String)] = { - Option(value).map(t => key -> t.getText).toSeq -} -val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++ - entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++ - entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++ - entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++ - entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs) - -(entries, None, Seq.empty, None) +if (!conf.getConf(CATALOG_IMPLEMENTATION).equals("hive")) { + super.withScriptIOSchema( +ctx, +inRowFormat, +recordWriter, +outRowFormat, +recordReader, +schemaLess) +} else { - case c: RowFormatSerdeContext => -// Use a serde format. -val CatalogStorageFormat(None, None, None, Some(name), _, props) = visitRowFormatSerde(c) + // Decode and input/output format. + type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) + + def format( + fmt: RowFormatContext, + configKey: String, + defaultConfigValue: String): Format = fmt match { +case c: RowFormatDelimitedContext => Review comment: > We already have tests for this code path? No, notice this too, need to add more test, start from PlanParserSuite This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458782794 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala ## @@ -87,17 +181,72 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } } + + private lazy val fieldWriters: Seq[String => Any] = output.map { attr => Review comment: > On second thought, `outputFieldWriters` looks better? more accurate, done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458778858 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ## @@ -679,64 +680,78 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { "Unsupported operation: Used defined record reader/writer classes.", ctx) } -// Decode and input/output format. -type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) -def format( -fmt: RowFormatContext, -configKey: String, -defaultConfigValue: String): Format = fmt match { - case c: RowFormatDelimitedContext => -// TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema -// expects a seq of pairs in which the old parsers' token names are used as keys. -// Transforming the result of visitRowFormatDelimited would be quite a bit messier than -// retrieving the key value pairs ourselves. -def entry(key: String, value: Token): Seq[(String, String)] = { - Option(value).map(t => key -> t.getText).toSeq -} -val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++ - entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++ - entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++ - entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++ - entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs) - -(entries, None, Seq.empty, None) +if (!conf.getConf(CATALOG_IMPLEMENTATION).equals("hive")) { + super.withScriptIOSchema( +ctx, +inRowFormat, +recordWriter, +outRowFormat, +recordReader, +schemaLess) +} else { - case c: RowFormatSerdeContext => -// Use a serde format. -val CatalogStorageFormat(None, None, None, Some(name), _, props) = visitRowFormatSerde(c) + // Decode and input/output format. + type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) + + def format( + fmt: RowFormatContext, + configKey: String, + defaultConfigValue: String): Format = fmt match { +case c: RowFormatDelimitedContext => + // TODO we should use visitRowFormatDelimited function here. However HiveScriptIOSchema + // expects a seq of pairs in which the old parsers' token names are used as keys. + // Transforming the result of visitRowFormatDelimited would be quite a bit messier than + // retrieving the key value pairs ourselves. + def entry(key: String, value: Token): Seq[(String, String)] = { +Option(value).map(t => key -> t.getText).toSeq + } + + val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++ +entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++ +entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++ +entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++ +entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs) Review comment: > We cannot share this code part with `AstBuilder` one? No, since hive can define just input as serde or just output as serde, so we need to keep this both part. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458776685 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala ## @@ -87,17 +181,72 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } } + + private lazy val fieldWriters: Seq[String => Any] = output.map { attr => +val converter = CatalystTypeConverters.createToCatalystConverter(attr.dataType) +attr.dataType match { + case StringType => wrapperConvertException(data => data, converter) + case BooleanType => wrapperConvertException(data => data.toBoolean, converter) + case ByteType => wrapperConvertException(data => data.toByte, converter) + case BinaryType => wrapperConvertException(data => data.getBytes, converter) + case IntegerType => wrapperConvertException(data => data.toInt, converter) + case ShortType => wrapperConvertException(data => data.toShort, converter) + case LongType => wrapperConvertException(data => data.toLong, converter) + case FloatType => wrapperConvertException(data => data.toFloat, converter) + case DoubleType => wrapperConvertException(data => data.toDouble, converter) + case _: DecimalType => wrapperConvertException(data => BigDecimal(data), converter) + case DateType if conf.datetimeJava8ApiEnabled => +wrapperConvertException(data => DateTimeUtils.stringToDate( + UTF8String.fromString(data), + DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) + .map(DateTimeUtils.daysToLocalDate).orNull, converter) + case DateType => wrapperConvertException(data => DateTimeUtils.stringToDate( +UTF8String.fromString(data), +DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) +.map(DateTimeUtils.toJavaDate).orNull, converter) + case TimestampType if conf.datetimeJava8ApiEnabled => + wrapperConvertException(data => DateTimeUtils.stringToTimestamp( + UTF8String.fromString(data), + DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) + .map(DateTimeUtils.microsToInstant).orNull, converter) + case TimestampType => wrapperConvertException(data => DateTimeUtils.stringToTimestamp( +UTF8String.fromString(data), +DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) +.map(DateTimeUtils.toJavaTimestamp).orNull, converter) + case CalendarIntervalType => wrapperConvertException( +data => IntervalUtils.stringToInterval(UTF8String.fromString(data)), +converter) + case udt: UserDefinedType[_] => +wrapperConvertException(data => udt.deserialize(data), converter) + case ArrayType(_, _) | MapType(_, _, _) | StructType(_) => Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458776096 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala ## @@ -20,78 +20,44 @@ package org.apache.spark.sql.hive.execution import java.sql.Timestamp import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe -import org.scalatest.Assertions._ -import org.scalatest.BeforeAndAfterEach import org.scalatest.exceptions.TestFailedException -import org.apache.spark.{SparkException, TaskContext, TestUtils} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Column -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, UnaryExecNode} +import org.apache.spark.{SparkException, TestUtils} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.types.StringType - -class HiveScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with TestHiveSingleton - with BeforeAndAfterEach { - import spark.implicits._ - - private val noSerdeIOSchema = HiveScriptIOSchema( -inputRowFormat = Seq.empty, -outputRowFormat = Seq.empty, -inputSerdeClass = None, -outputSerdeClass = None, -inputSerdeProps = Seq.empty, -outputSerdeProps = Seq.empty, -recordReaderClass = None, -recordWriterClass = None, -schemaLess = false - ) - - private val serdeIOSchema = noSerdeIOSchema.copy( -inputSerdeClass = Some(classOf[LazySimpleSerDe].getCanonicalName), -outputSerdeClass = Some(classOf[LazySimpleSerDe].getCanonicalName) - ) - - private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ - - private val uncaughtExceptionHandler = new TestUncaughtExceptionHandler - - protected override def beforeAll(): Unit = { -super.beforeAll() -defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler -Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +class HiveScriptTransformationSuite extends BaseScriptTransformationSuite with TestHiveSingleton { + import testImplicits._ + + import ScriptTransformationIOSchema._ + + override def isHive23OrSpark: Boolean = HiveUtils.isHive23 + + override def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec = { +HiveScriptTransformationExec( + input = input, + script = script, + output = output, + child = child, + ioschema = ioschema +) } - protected override def afterAll(): Unit = { -super.afterAll() -Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) - } - - override protected def afterEach(): Unit = { -super.afterEach() -uncaughtExceptionHandler.cleanStatus() - } - - test("cat without SerDe") { -assume(TestUtils.testCommandAvailable("/bin/bash")) - -val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") -checkAnswer( - rowsDf, - (child: SparkPlan) => new HiveScriptTransformationExec( -input = Seq(rowsDf.col("a").expr), -script = "cat", -output = Seq(AttributeReference("a", StringType)()), -child = child, -ioschema = noSerdeIOSchema - ), - rowsDf.collect()) -assert(uncaughtExceptionHandler.exception.isEmpty) + private val serdeIOSchema: ScriptTransformationIOSchema = { Review comment: DOne This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458777932 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458777082 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,89 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), 1, smallint(100), bigint(1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), 2, smallint(200), bigint(2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), 3, smallint(300), bigint(3), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +AS t(a, b, c, d, e, f, g, h, i, j, k, l); + +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t; + +-- with non-exist command +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t; + +-- with non-exist file +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t; + +-- common supported data types between no serde and serde transform +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) +USING 'cat' AS ( +a string, +b boolean, +c binary, +d tinyint, +e int, +f smallint, +g long, +h float, +i double, +j decimal(38, 18), +k timestamp, +l date) +FROM t +) tmp; + +-- common supported data types between no serde and serde transform +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) +USING 'cat' AS ( +a string, +b string, +c string, +d string, +e string, +f string, +g string, +h string, +i string, +j string, +k string, +l string) +FROM t +) tmp; + +-- handle schema less +SELECT TRANSFORM(a) Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458775685 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala ## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.{SparkException, TestUtils} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.test.SharedSparkSession + +class SparkScriptTransformationSuite extends BaseScriptTransformationSuite with SharedSparkSession { + import testImplicits._ + + override def isHive23OrSpark: Boolean = true + + override def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec = { +SparkScriptTransformationExec( + input = input, + script = script, + output = output, + child = child, + ioschema = ioschema +) + } + + test("SPARK-32106: TRANSFORM with serde without hive should throw exception") { +assume(TestUtils.testCommandAvailable("/bin/bash")) +withTempView("v") { + val df = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") + df.createTempView("v") + + val e = intercept[ParseException] { +sql( + """ +|SELECT TRANSFORM (a) +|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' +|USING 'cat' AS (a) +|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' +|FROM v + """.stripMargin) + }.getMessage + assert(e.contains("TRANSFORM with serde is only supported in hive mode")) +} + } + + test("TRANSFORM don't support ArrayType/MapType/StructType as output data type (no serde)") { Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458712933 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala ## @@ -754,7 +755,80 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging outRowFormat: RowFormatContext, recordReader: Token, schemaLess: Boolean): ScriptInputOutputSchema = { -throw new ParseException("Script Transform is not supported", ctx) +if (recordWriter != null || recordReader != null) { + // TODO: what does this message mean? + throw new ParseException( +"Unsupported operation: Used defined record reader/writer classes.", ctx) +} + +// Decode and input/output format. +type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) + +def format( +fmt: RowFormatContext, +configKey: String, +defaultConfigValue: String): Format = fmt match { + case c: RowFormatDelimitedContext => +// TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema +// expects a seq of pairs in which the old parsers' token names are used as keys. +// Transforming the result of visitRowFormatDelimited would be quite a bit messier than +// retrieving the key value pairs ourselves. +def entry(key: String, value: Token): Seq[(String, String)] = { + Option(value).map(t => key -> t.getText).toSeq +} + +val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++ + entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++ + entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++ + entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++ + entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs) + +(entries, None, Seq.empty, None) + + case c: RowFormatContext if !conf.getConf(CATALOG_IMPLEMENTATION).equals("hive") => +throw new ParseException("TRANSFORM with serde is only supported in hive mode", ctx) + + case c: RowFormatSerdeContext => +// Use a serde format. +val CatalogStorageFormat(None, None, None, Some(name), _, props) = visitRowFormatSerde(c) + +// SPARK-10310: Special cases LazySimpleSerDe +val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { + Option(conf.getConfString(configKey, defaultConfigValue)) +} else { + None +} +(Seq.empty, Option(name), props.toSeq, recordHandler) + + case null if conf.getConf(CATALOG_IMPLEMENTATION).equals("hive") => Review comment: > The hive-specific code part should be placed in `SparkSqlParser`. Could you check related code around again? e.g., `visitCreateTable`. Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458697351 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458697351 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458696650 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458691259 ## File path: sql/core/src/test/resources/sql-tests/results/transform.sql.out ## @@ -0,0 +1,160 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 12 + + +-- !query +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), 1, smallint(100), bigint(1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), 2, smallint(200), bigint(2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), 3, smallint(300), bigint(3), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +as t1(a, b, c, d, e, f, g, h, i, j, k, l) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t1 +-- !query schema +struct +-- !query output +1 +2 +3 + + +-- !query +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t1 +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkException +Subprocess exited with status 127. Error: /bin/bash: some_non_existent_command: command not found + + +-- !query +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t1 +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkException +Subprocess exited with status 2. Error: python: can't open file 'some_non_existent_file': [Errno 2] No such file or directory + + +-- !query +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) +USING 'cat' AS ( +a string, +b boolean, +c binary, +d tinyint, +e int, +f smallint, +g long, +h float, +i double, +j decimal(38, 18), +k timestamp, +l date) +FROM t1 +) tmp +-- !query schema +struct +-- !query output +1 trueSpark SQL 1 1 100 1 1.0 1.0 1.001997-01-02 00:00:00 2000-04-01 +2 false Spark SQL 2 2 200 2 2.0 2.0 2.001997-01-02 03:04:05 2000-04-02 +3 trueSpark SQL 3 3 300 3 3.0 3.0 3.001997-02-10 17:32:01 2000-04-03 + + +-- !query +SELECT TRANSFORM(a) +USING 'cat' +FROM t1 +-- !query schema +struct<> +-- !query output +java.lang.ArrayIndexOutOfBoundsException Review comment: > Which code throws this exception? ``` Caused by: java.lang.ArrayIndexOutOfBoundsException: 1 at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericGet(rows.scala:201) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getAs(rows.scala:35) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt(rows.scala:36) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt$(rows.scala:36) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.isNullAt(rows.scala:195) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:40) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:22) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:341) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:464) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1403) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:467) ... 3 more ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail:
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458689199 ## File path: sql/core/src/test/resources/sql-tests/results/transform.sql.out ## @@ -0,0 +1,160 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 12 + + +-- !query +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), 1, smallint(100), bigint(1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), 2, smallint(200), bigint(2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), 3, smallint(300), bigint(3), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +as t1(a, b, c, d, e, f, g, h, i, j, k, l) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t1 +-- !query schema +struct +-- !query output +1 +2 +3 + + +-- !query +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t1 +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkException +Subprocess exited with status 127. Error: /bin/bash: some_non_existent_command: command not found + + +-- !query +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t1 +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkException +Subprocess exited with status 2. Error: python: can't open file 'some_non_existent_file': [Errno 2] No such file or directory + + +-- !query +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) +USING 'cat' AS ( +a string, +b boolean, +c binary, +d tinyint, +e int, +f smallint, +g long, +h float, +i double, +j decimal(38, 18), +k timestamp, +l date) +FROM t1 +) tmp +-- !query schema +struct +-- !query output +1 trueSpark SQL 1 1 100 1 1.0 1.0 1.001997-01-02 00:00:00 2000-04-01 +2 false Spark SQL 2 2 200 2 2.0 2.0 2.001997-01-02 03:04:05 2000-04-02 +3 trueSpark SQL 3 3 300 3 3.0 3.0 3.001997-02-10 17:32:01 2000-04-03 + + +-- !query +SELECT TRANSFORM(a) +USING 'cat' +FROM t1 +-- !query schema +struct<> +-- !query output +java.lang.ArrayIndexOutOfBoundsException +1 + + +-- !query +SELECT TRANSFORM(a, b) +USING 'cat' +FROM t1 +-- !query schema +struct +-- !query output +1 true +2 false +3 true + + +-- !query +SELECT TRANSFORM(a, b, c) Review comment: > Yea, could you file jira now and add the ID in the comment? Sure https://issues.apache.org/jira/browse/SPARK-32388 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458688383 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala ## @@ -206,75 +169,83 @@ class HiveScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with val query = sql( s""" - |SELECT - |TRANSFORM(a, b, c, d, e) - |USING 'python $scriptFilePath' AS (a, b, c, d, e) - |FROM v + |SELECT TRANSFORM(a, b, c, d, e) + |USING 'python ${scriptFilePath}' + |FROM v """.stripMargin) - // In Hive 1.2, the string representation of a decimal omits trailing zeroes. - // But in Hive 2.3, it is always padded to 18 digits with trailing zeroes if necessary. - val decimalToString: Column => Column = if (HiveUtils.isHive23) { -c => c.cast("string") - } else { -c => c.cast("decimal(1, 0)").cast("string") - } - checkAnswer(query, identity, df.select( -'a.cast("string"), -'b.cast("string"), -'c.cast("string"), -decimalToString('d), -'e.cast("string")).collect()) + // In hive default serde mode, if we don't define output schema, it will choose first + // two column as output schema (key: String, value: String) + checkAnswer( +query, +identity, +df.select( + 'a.cast("string").as("key"), + 'b.cast("string").as("value")).collect()) } } - test("SPARK-30973: TRANSFORM should wait for the termination of the script (no serde)") { + test("SPARK-32106: TRANSFORM support complex data types as input and ouput type (hive serde)") { assume(TestUtils.testCommandAvailable("/bin/bash")) +withTempView("v") { + val df = Seq( +(1, "1", Array(0, 1, 2), Map("a" -> 1)), +(2, "2", Array(3, 4, 5), Map("b" -> 2))) +.toDF("a", "b", "c", "d") + .select('a, 'b, 'c, 'd, struct('a, 'b).as("e")) + df.createTempView("v") -val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") -val e = intercept[SparkException] { - val plan = -new HiveScriptTransformationExec( - input = Seq(rowsDf.col("a").expr), - script = "some_non_existent_command", - output = Seq(AttributeReference("a", StringType)()), - child = rowsDf.queryExecution.sparkPlan, - ioschema = noSerdeIOSchema) - SparkPlanTest.executePlan(plan, hiveContext) + // Hive serde support ArrayType/MapType/StructType as input and output data type + checkAnswer( +df, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq( +df.col("c").expr, +df.col("d").expr, +df.col("e").expr), + script = "cat", + output = Seq( +AttributeReference("c", ArrayType(IntegerType))(), +AttributeReference("d", MapType(StringType, IntegerType))(), +AttributeReference("e", StructType( + Seq( +StructField("col1", IntegerType, false), +StructField("col2", StringType, true()), + child = child, + ioschema = serdeIOSchema +), +df.select('c, 'd, 'e).collect()) } -assert(e.getMessage.contains("Subprocess exited with status")) -assert(uncaughtExceptionHandler.exception.isEmpty) } - test("SPARK-30973: TRANSFORM should wait for the termination of the script (with serde)") { + test("SPARK-32106: TRANSFORM don't support CalenderIntervalType/UserDefinedType (hive serde)") { assume(TestUtils.testCommandAvailable("/bin/bash")) +withTempView("v") { + val df = Seq( +(1, new CalendarInterval(7, 1, 1000), new TestUDT.MyDenseVector(Array(1, 2, 3))), +(1, new CalendarInterval(7, 1, 1000), new TestUDT.MyDenseVector(Array(1, 2, 3 +.toDF("a", "b", "c") + df.createTempView("v") -val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") -val e = intercept[SparkException] { - val plan = -new HiveScriptTransformationExec( - input = Seq(rowsDf.col("a").expr), - script = "some_non_existent_command", - output = Seq(AttributeReference("a", StringType)()), - child = rowsDf.queryExecution.sparkPlan, - ioschema = serdeIOSchema) - SparkPlanTest.executePlan(plan, hiveContext) + val e1 = intercept[Exception] { +sql( + """ +|SELECT TRANSFORM(a, b) USING 'cat' AS (a, b) +|FROM v + """.stripMargin).collect() + } + assert(e1.getMessage.contains("scala.MatchError: CalendarIntervalType")) Review comment: > Ah, I see. which code throws this match error? HiveInspectors$typeInfoConversions.toTypeInfo ``` def toTypeInfo: TypeInfo = dt match { case
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458681551 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala ## @@ -87,17 +178,69 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } } + + private lazy val fieldWriters: Seq[String => Any] = output.map { attr => +val converter = CatalystTypeConverters.createToCatalystConverter(attr.dataType) +attr.dataType match { + case StringType => wrapperConvertException(data => data, converter) + case BooleanType => wrapperConvertException(data => data.toBoolean, converter) + case ByteType => wrapperConvertException(data => data.toByte, converter) + case BinaryType => wrapperConvertException(data => data.getBytes, converter) + case IntegerType => wrapperConvertException(data => data.toInt, converter) + case ShortType => wrapperConvertException(data => data.toShort, converter) + case LongType => wrapperConvertException(data => data.toLong, converter) + case FloatType => wrapperConvertException(data => data.toFloat, converter) + case DoubleType => wrapperConvertException(data => data.toDouble, converter) + case _: DecimalType => wrapperConvertException(data => BigDecimal(data), converter) + case DateType if conf.datetimeJava8ApiEnabled => +wrapperConvertException(data => DateTimeUtils.stringToDate( + UTF8String.fromString(data), + DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) + .map(DateTimeUtils.daysToLocalDate).orNull, converter) + case DateType => wrapperConvertException(data => DateTimeUtils.stringToDate( +UTF8String.fromString(data), +DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) +.map(DateTimeUtils.toJavaDate).orNull, converter) + case TimestampType if conf.datetimeJava8ApiEnabled => + wrapperConvertException(data => DateTimeUtils.stringToTimestamp( + UTF8String.fromString(data), + DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) + .map(DateTimeUtils.microsToInstant).orNull, converter) + case TimestampType => wrapperConvertException(data => DateTimeUtils.stringToTimestamp( +UTF8String.fromString(data), +DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) +.map(DateTimeUtils.toJavaTimestamp).orNull, converter) + case CalendarIntervalType => wrapperConvertException( +data => IntervalUtils.stringToInterval(UTF8String.fromString(data)), +converter) + case udt: UserDefinedType[_] => +wrapperConvertException(data => udt.deserialize(data), converter) + case _ => wrapperConvertException(data => data, converter) Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458654492 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala ## @@ -206,75 +169,83 @@ class HiveScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with val query = sql( s""" - |SELECT - |TRANSFORM(a, b, c, d, e) - |USING 'python $scriptFilePath' AS (a, b, c, d, e) - |FROM v + |SELECT TRANSFORM(a, b, c, d, e) + |USING 'python ${scriptFilePath}' + |FROM v """.stripMargin) - // In Hive 1.2, the string representation of a decimal omits trailing zeroes. - // But in Hive 2.3, it is always padded to 18 digits with trailing zeroes if necessary. - val decimalToString: Column => Column = if (HiveUtils.isHive23) { -c => c.cast("string") - } else { -c => c.cast("decimal(1, 0)").cast("string") - } - checkAnswer(query, identity, df.select( -'a.cast("string"), -'b.cast("string"), -'c.cast("string"), -decimalToString('d), -'e.cast("string")).collect()) + // In hive default serde mode, if we don't define output schema, it will choose first + // two column as output schema (key: String, value: String) + checkAnswer( +query, +identity, +df.select( + 'a.cast("string").as("key"), + 'b.cast("string").as("value")).collect()) } } - test("SPARK-30973: TRANSFORM should wait for the termination of the script (no serde)") { + test("SPARK-32106: TRANSFORM support complex data types as input and ouput type (hive serde)") { assume(TestUtils.testCommandAvailable("/bin/bash")) +withTempView("v") { + val df = Seq( +(1, "1", Array(0, 1, 2), Map("a" -> 1)), +(2, "2", Array(3, 4, 5), Map("b" -> 2))) +.toDF("a", "b", "c", "d") + .select('a, 'b, 'c, 'd, struct('a, 'b).as("e")) + df.createTempView("v") -val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") -val e = intercept[SparkException] { - val plan = -new HiveScriptTransformationExec( - input = Seq(rowsDf.col("a").expr), - script = "some_non_existent_command", - output = Seq(AttributeReference("a", StringType)()), - child = rowsDf.queryExecution.sparkPlan, - ioschema = noSerdeIOSchema) - SparkPlanTest.executePlan(plan, hiveContext) + // Hive serde support ArrayType/MapType/StructType as input and output data type + checkAnswer( +df, +(child: SparkPlan) => createScriptTransformationExec( Review comment: > Why is the test method different between this test unit and ` test("[SPARK-32106](https://issues.apache.org/jira/browse/SPARK-32106): TRANSFORM don't support CalenderIntervalType/UserDefinedType (hive serde)") {` ? This is a plan test and the other one is an end-2-end test? I think we need both test cases (plan tests and end-2-end tests) though. Yea, Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458649024 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala ## @@ -206,75 +169,83 @@ class HiveScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with val query = sql( s""" - |SELECT - |TRANSFORM(a, b, c, d, e) - |USING 'python $scriptFilePath' AS (a, b, c, d, e) - |FROM v + |SELECT TRANSFORM(a, b, c, d, e) + |USING 'python ${scriptFilePath}' + |FROM v """.stripMargin) - // In Hive 1.2, the string representation of a decimal omits trailing zeroes. - // But in Hive 2.3, it is always padded to 18 digits with trailing zeroes if necessary. - val decimalToString: Column => Column = if (HiveUtils.isHive23) { -c => c.cast("string") - } else { -c => c.cast("decimal(1, 0)").cast("string") - } - checkAnswer(query, identity, df.select( -'a.cast("string"), -'b.cast("string"), -'c.cast("string"), -decimalToString('d), -'e.cast("string")).collect()) + // In hive default serde mode, if we don't define output schema, it will choose first + // two column as output schema (key: String, value: String) + checkAnswer( +query, +identity, +df.select( + 'a.cast("string").as("key"), + 'b.cast("string").as("value")).collect()) } } - test("SPARK-30973: TRANSFORM should wait for the termination of the script (no serde)") { + test("SPARK-32106: TRANSFORM support complex data types as input and ouput type (hive serde)") { assume(TestUtils.testCommandAvailable("/bin/bash")) +withTempView("v") { + val df = Seq( +(1, "1", Array(0, 1, 2), Map("a" -> 1)), +(2, "2", Array(3, 4, 5), Map("b" -> 2))) +.toDF("a", "b", "c", "d") + .select('a, 'b, 'c, 'd, struct('a, 'b).as("e")) + df.createTempView("v") -val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") -val e = intercept[SparkException] { - val plan = -new HiveScriptTransformationExec( - input = Seq(rowsDf.col("a").expr), - script = "some_non_existent_command", - output = Seq(AttributeReference("a", StringType)()), - child = rowsDf.queryExecution.sparkPlan, - ioschema = noSerdeIOSchema) - SparkPlanTest.executePlan(plan, hiveContext) + // Hive serde support ArrayType/MapType/StructType as input and output data type + checkAnswer( +df, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq( +df.col("c").expr, +df.col("d").expr, +df.col("e").expr), + script = "cat", + output = Seq( +AttributeReference("c", ArrayType(IntegerType))(), +AttributeReference("d", MapType(StringType, IntegerType))(), +AttributeReference("e", StructType( + Seq( +StructField("col1", IntegerType, false), +StructField("col2", StringType, true()), + child = child, + ioschema = serdeIOSchema +), +df.select('c, 'd, 'e).collect()) } -assert(e.getMessage.contains("Subprocess exited with status")) -assert(uncaughtExceptionHandler.exception.isEmpty) } - test("SPARK-30973: TRANSFORM should wait for the termination of the script (with serde)") { + test("SPARK-32106: TRANSFORM don't support CalenderIntervalType/UserDefinedType (hive serde)") { assume(TestUtils.testCommandAvailable("/bin/bash")) +withTempView("v") { + val df = Seq( +(1, new CalendarInterval(7, 1, 1000), new TestUDT.MyDenseVector(Array(1, 2, 3))), +(1, new CalendarInterval(7, 1, 1000), new TestUDT.MyDenseVector(Array(1, 2, 3 +.toDF("a", "b", "c") + df.createTempView("v") -val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") -val e = intercept[SparkException] { - val plan = -new HiveScriptTransformationExec( - input = Seq(rowsDf.col("a").expr), - script = "some_non_existent_command", - output = Seq(AttributeReference("a", StringType)()), - child = rowsDf.queryExecution.sparkPlan, - ioschema = serdeIOSchema) - SparkPlanTest.executePlan(plan, hiveContext) + val e1 = intercept[Exception] { +sql( + """ +|SELECT TRANSFORM(a, b) USING 'cat' AS (a, b) +|FROM v + """.stripMargin).collect() + } + assert(e1.getMessage.contains("scala.MatchError: CalendarIntervalType")) Review comment: it's a SparkException, but root cause is MatchError. This is an automated message from the Apache
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458631113 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ## @@ -713,13 +714,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } (Seq.empty, Option(name), props.toSeq, recordHandler) - case null => + case null if conf.getConf(CATALOG_IMPLEMENTATION).equals("hive") => // Use default (serde) format. val name = conf.getConfString("hive.script.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") val props = Seq("field.delim" -> "\t") val recordHandler = Option(conf.getConfString(configKey, defaultConfigValue)) (Nil, Option(name), props, recordHandler) Review comment: > Also, could we throw an exception when custom serde given without hive? DONE and will add a UT This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458624125 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458620082 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,72 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES Review comment: Done ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala ## @@ -56,10 +69,88 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } - def processIterator( + protected def initProc: (OutputStream, Process, InputStream, CircularBuffer) = { +val cmd = List("/bin/bash", "-c", script) +val builder = new ProcessBuilder(cmd.asJava) + +val proc = builder.start() +val inputStream = proc.getInputStream +val outputStream = proc.getOutputStream +val errorStream = proc.getErrorStream + +// In order to avoid deadlocks, we need to consume the error output of the child process. +// To avoid issues caused by large error output, we use a circular buffer to limit the amount +// of error output that we retain. See SPARK-7862 for more discussion of the deadlock / hang +// that motivates this. +val stderrBuffer = new CircularBuffer(2048) +new RedirectThread( + errorStream, + stderrBuffer, + s"Thread-${this.getClass.getSimpleName}-STDERR-Consumer").start() +(outputStream, proc, inputStream, stderrBuffer) + } + + protected def processIterator( inputIterator: Iterator[InternalRow], hadoopConf: Configuration): Iterator[InternalRow] + protected def createOutputIteratorWithoutSerde( + writerThread: BaseScriptTransformationWriterThread, + inputStream: InputStream, + proc: Process, + stderrBuffer: CircularBuffer): Iterator[InternalRow] = { +new Iterator[InternalRow] { + var curLine: String = null + val reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)) + + val outputRowFormat = ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD") + val processRowWithoutSerde = if (!ioschema.schemaLess) { +prevLine: String => + new GenericInternalRow( +prevLine.split(outputRowFormat) + .zip(fieldWriters) + .map { case (data, writer) => writer(data) }) + } else { +// In schema less mode, hive default serde will choose first two output column as output +// if output column size less then 2, it will throw ArrayIndexOutOfBoundsException. +// Here we change spark's behavior same as hive's default serde +prevLine: String => + new GenericInternalRow( +prevLine.split(outputRowFormat).slice(0, 2) + .map(CatalystTypeConverters.createToCatalystConverter(StringType))) Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458619650 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,72 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), 1, smallint(100), bigint(1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), 2, smallint(200), bigint(2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), 3, smallint(300), bigint(3), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +as t1(a, b, c, d, e, f, g, h, i, j, k, l); Review comment: Done ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,72 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), 1, smallint(100), bigint(1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), 2, smallint(200), bigint(2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), 3, smallint(300), bigint(3), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +as t1(a, b, c, d, e, f, g, h, i, j, k, l); + +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t1; + +-- with non-exist command +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t1; + +-- with non-exist file +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t1; + +-- common supported data types between no serde and serde transform +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) +USING 'cat' AS ( +a string, +b boolean, +c binary, +d tinyint, +e int, +f smallint, +g long, +h float, +i double, +j decimal(38, 18), +k timestamp, +l date) +FROM t1 +) tmp; + +-- handle schema less +SELECT TRANSFORM(a) +USING 'cat' +FROM t1; + +SELECT TRANSFORM(a, b) +USING 'cat' +FROM t1; + +SELECT TRANSFORM(a, b, c) +USING 'cat' +FROM t1; + +-- return null when return string incompatible (no serde) +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i) +USING 'cat' as (a int, b short, c long, d byte, e float, f double, g decimal(38, 18), h date, i timestamp) +FROM VALUES +('a','','1231a','a','213.21a','213.21a','0a.21d','2000-04-01123','1997-0102 00:00:') tmp(a, b, c, d, e, f, g, h, i); + +-- SPARK-28227: transform can't run with aggregation +SELECT TRANSFORM(b, max(a), sum(f)) +USING 'cat' AS (a, b) +FROM t1 +GROUP BY b; + +-- transform use MAP +MAP a, b USING 'cat' AS (a, b) FROM t1; + +-- transform use REDUCE +REDUCE a, b USING 'cat' AS (a, b) FROM t1; + + Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458618057 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala ## @@ -206,75 +169,83 @@ class HiveScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with val query = sql( s""" - |SELECT - |TRANSFORM(a, b, c, d, e) - |USING 'python $scriptFilePath' AS (a, b, c, d, e) - |FROM v + |SELECT TRANSFORM(a, b, c, d, e) + |USING 'python ${scriptFilePath}' + |FROM v """.stripMargin) - // In Hive 1.2, the string representation of a decimal omits trailing zeroes. - // But in Hive 2.3, it is always padded to 18 digits with trailing zeroes if necessary. - val decimalToString: Column => Column = if (HiveUtils.isHive23) { -c => c.cast("string") - } else { -c => c.cast("decimal(1, 0)").cast("string") - } - checkAnswer(query, identity, df.select( -'a.cast("string"), -'b.cast("string"), -'c.cast("string"), -decimalToString('d), -'e.cast("string")).collect()) + // In hive default serde mode, if we don't define output schema, it will choose first + // two column as output schema (key: String, value: String) + checkAnswer( +query, +identity, +df.select( + 'a.cast("string").as("key"), + 'b.cast("string").as("value")).collect()) } } - test("SPARK-30973: TRANSFORM should wait for the termination of the script (no serde)") { + test("SPARK-32106: TRANSFORM support complex data types as input and ouput type (hive serde)") { assume(TestUtils.testCommandAvailable("/bin/bash")) +withTempView("v") { + val df = Seq( +(1, "1", Array(0, 1, 2), Map("a" -> 1)), +(2, "2", Array(3, 4, 5), Map("b" -> 2))) +.toDF("a", "b", "c", "d") + .select('a, 'b, 'c, 'd, struct('a, 'b).as("e")) + df.createTempView("v") -val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") -val e = intercept[SparkException] { - val plan = -new HiveScriptTransformationExec( - input = Seq(rowsDf.col("a").expr), - script = "some_non_existent_command", - output = Seq(AttributeReference("a", StringType)()), - child = rowsDf.queryExecution.sparkPlan, - ioschema = noSerdeIOSchema) - SparkPlanTest.executePlan(plan, hiveContext) + // Hive serde support ArrayType/MapType/StructType as input and output data type + checkAnswer( +df, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq( +df.col("c").expr, +df.col("d").expr, +df.col("e").expr), + script = "cat", + output = Seq( +AttributeReference("c", ArrayType(IntegerType))(), +AttributeReference("d", MapType(StringType, IntegerType))(), +AttributeReference("e", StructType( + Seq( +StructField("col1", IntegerType, false), +StructField("col2", StringType, true()), + child = child, + ioschema = serdeIOSchema +), +df.select('c, 'd, 'e).collect()) } -assert(e.getMessage.contains("Subprocess exited with status")) -assert(uncaughtExceptionHandler.exception.isEmpty) } - test("SPARK-30973: TRANSFORM should wait for the termination of the script (with serde)") { + test("SPARK-32106: TRANSFORM don't support CalenderIntervalType/UserDefinedType (hive serde)") { Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458617941 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala ## @@ -206,75 +169,83 @@ class HiveScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with val query = sql( s""" - |SELECT - |TRANSFORM(a, b, c, d, e) - |USING 'python $scriptFilePath' AS (a, b, c, d, e) - |FROM v + |SELECT TRANSFORM(a, b, c, d, e) + |USING 'python ${scriptFilePath}' + |FROM v """.stripMargin) - // In Hive 1.2, the string representation of a decimal omits trailing zeroes. - // But in Hive 2.3, it is always padded to 18 digits with trailing zeroes if necessary. - val decimalToString: Column => Column = if (HiveUtils.isHive23) { -c => c.cast("string") - } else { -c => c.cast("decimal(1, 0)").cast("string") - } - checkAnswer(query, identity, df.select( -'a.cast("string"), -'b.cast("string"), -'c.cast("string"), -decimalToString('d), -'e.cast("string")).collect()) + // In hive default serde mode, if we don't define output schema, it will choose first + // two column as output schema (key: String, value: String) + checkAnswer( +query, +identity, +df.select( + 'a.cast("string").as("key"), + 'b.cast("string").as("value")).collect()) } } - test("SPARK-30973: TRANSFORM should wait for the termination of the script (no serde)") { + test("SPARK-32106: TRANSFORM support complex data types as input and ouput type (hive serde)") { Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458617007 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala ## @@ -192,7 +137,25 @@ class HiveScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with assert(uncaughtExceptionHandler.exception.isEmpty) } - test("SPARK-25990: TRANSFORM should handle different data types correctly") { + test("SPARK-30973: TRANSFORM should wait for the termination of the script (with serde)") { Review comment: > how about `with serde` -> `hive serde` in the existing tests names? all right, better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458616565 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala ## @@ -206,75 +169,83 @@ class HiveScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with val query = sql( s""" - |SELECT - |TRANSFORM(a, b, c, d, e) - |USING 'python $scriptFilePath' AS (a, b, c, d, e) - |FROM v + |SELECT TRANSFORM(a, b, c, d, e) + |USING 'python ${scriptFilePath}' + |FROM v """.stripMargin) - // In Hive 1.2, the string representation of a decimal omits trailing zeroes. - // But in Hive 2.3, it is always padded to 18 digits with trailing zeroes if necessary. - val decimalToString: Column => Column = if (HiveUtils.isHive23) { -c => c.cast("string") - } else { -c => c.cast("decimal(1, 0)").cast("string") - } - checkAnswer(query, identity, df.select( -'a.cast("string"), -'b.cast("string"), -'c.cast("string"), -decimalToString('d), -'e.cast("string")).collect()) + // In hive default serde mode, if we don't define output schema, it will choose first + // two column as output schema (key: String, value: String) + checkAnswer( +query, +identity, +df.select( + 'a.cast("string").as("key"), + 'b.cast("string").as("value")).collect()) } } - test("SPARK-30973: TRANSFORM should wait for the termination of the script (no serde)") { + test("SPARK-32106: TRANSFORM support complex data types as input and ouput type (hive serde)") { assume(TestUtils.testCommandAvailable("/bin/bash")) +withTempView("v") { + val df = Seq( +(1, "1", Array(0, 1, 2), Map("a" -> 1)), +(2, "2", Array(3, 4, 5), Map("b" -> 2))) +.toDF("a", "b", "c", "d") + .select('a, 'b, 'c, 'd, struct('a, 'b).as("e")) + df.createTempView("v") -val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") -val e = intercept[SparkException] { - val plan = -new HiveScriptTransformationExec( - input = Seq(rowsDf.col("a").expr), - script = "some_non_existent_command", - output = Seq(AttributeReference("a", StringType)()), - child = rowsDf.queryExecution.sparkPlan, - ioschema = noSerdeIOSchema) - SparkPlanTest.executePlan(plan, hiveContext) + // Hive serde support ArrayType/MapType/StructType as input and output data type + checkAnswer( +df, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq( +df.col("c").expr, +df.col("d").expr, +df.col("e").expr), + script = "cat", + output = Seq( +AttributeReference("c", ArrayType(IntegerType))(), +AttributeReference("d", MapType(StringType, IntegerType))(), +AttributeReference("e", StructType( + Seq( +StructField("col1", IntegerType, false), +StructField("col2", StringType, true()), + child = child, + ioschema = serdeIOSchema +), +df.select('c, 'd, 'e).collect()) } -assert(e.getMessage.contains("Subprocess exited with status")) -assert(uncaughtExceptionHandler.exception.isEmpty) } - test("SPARK-30973: TRANSFORM should wait for the termination of the script (with serde)") { + test("SPARK-32106: TRANSFORM don't support CalenderIntervalType/UserDefinedType (hive serde)") { Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458616070 ## File path: sql/core/src/test/resources/sql-tests/results/transform.sql.out ## @@ -0,0 +1,160 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 12 + + +-- !query +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), 1, smallint(100), bigint(1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), 2, smallint(200), bigint(2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), 3, smallint(300), bigint(3), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +as t1(a, b, c, d, e, f, g, h, i, j, k, l) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t1 +-- !query schema +struct +-- !query output +1 +2 +3 + + +-- !query +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t1 +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkException +Subprocess exited with status 127. Error: /bin/bash: some_non_existent_command: command not found + + +-- !query +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t1 +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkException +Subprocess exited with status 2. Error: python: can't open file 'some_non_existent_file': [Errno 2] No such file or directory + + +-- !query +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) +USING 'cat' AS ( +a string, +b boolean, +c binary, +d tinyint, +e int, +f smallint, +g long, +h float, +i double, +j decimal(38, 18), +k timestamp, +l date) +FROM t1 +) tmp +-- !query schema +struct +-- !query output +1 trueSpark SQL 1 1 100 1 1.0 1.0 1.001997-01-02 00:00:00 2000-04-01 +2 false Spark SQL 2 2 200 2 2.0 2.0 2.001997-01-02 03:04:05 2000-04-02 +3 trueSpark SQL 3 3 300 3 3.0 3.0 3.001997-02-10 17:32:01 2000-04-03 + + +-- !query +SELECT TRANSFORM(a) +USING 'cat' +FROM t1 +-- !query schema +struct<> +-- !query output +java.lang.ArrayIndexOutOfBoundsException +1 + + +-- !query +SELECT TRANSFORM(a, b) +USING 'cat' +FROM t1 +-- !query schema +struct +-- !query output +1 true +2 false +3 true + + +-- !query +SELECT TRANSFORM(a, b, c) Review comment: > Yea, I know the current behaviour follows the v3.0.0 one though. I test it in hive 1.1 as below ![image](https://user-images.githubusercontent.com/46485123/88152189-453ac080-cc36-11ea-9eca-b73a1d8100a1.png) Spark to Hive Serde data conversion via HiveInspector, it's HiveInspector's behavior, if we want to keep same with Hive, I will raise a new pr This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458612159 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala ## @@ -87,17 +178,69 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } } + + private lazy val fieldWriters: Seq[String => Any] = output.map { attr => +val converter = CatalystTypeConverters.createToCatalystConverter(attr.dataType) +attr.dataType match { + case StringType => wrapperConvertException(data => data, converter) + case BooleanType => wrapperConvertException(data => data.toBoolean, converter) + case ByteType => wrapperConvertException(data => data.toByte, converter) + case BinaryType => wrapperConvertException(data => data.getBytes, converter) + case IntegerType => wrapperConvertException(data => data.toInt, converter) + case ShortType => wrapperConvertException(data => data.toShort, converter) + case LongType => wrapperConvertException(data => data.toLong, converter) + case FloatType => wrapperConvertException(data => data.toFloat, converter) + case DoubleType => wrapperConvertException(data => data.toDouble, converter) + case _: DecimalType => wrapperConvertException(data => BigDecimal(data), converter) + case DateType if conf.datetimeJava8ApiEnabled => +wrapperConvertException(data => DateTimeUtils.stringToDate( + UTF8String.fromString(data), + DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) + .map(DateTimeUtils.daysToLocalDate).orNull, converter) + case DateType => wrapperConvertException(data => DateTimeUtils.stringToDate( +UTF8String.fromString(data), +DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) +.map(DateTimeUtils.toJavaDate).orNull, converter) + case TimestampType if conf.datetimeJava8ApiEnabled => + wrapperConvertException(data => DateTimeUtils.stringToTimestamp( + UTF8String.fromString(data), + DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) + .map(DateTimeUtils.microsToInstant).orNull, converter) + case TimestampType => wrapperConvertException(data => DateTimeUtils.stringToTimestamp( +UTF8String.fromString(data), +DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) +.map(DateTimeUtils.toJavaTimestamp).orNull, converter) + case CalendarIntervalType => wrapperConvertException( +data => IntervalUtils.stringToInterval(UTF8String.fromString(data)), +converter) + case udt: UserDefinedType[_] => +wrapperConvertException(data => udt.deserialize(data), converter) + case _ => wrapperConvertException(data => data, converter) +} + } + + // Keep consistent with Hive `LazySimpleSerde`, when there is a type case error, return null + private val wrapperConvertException: (String => Any, Any => Any) => String => Any = +(f: String => Any, converter: Any => Any) => + (data: String) => converter { +try { + f(data) +} catch { + case _: Exception => null Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458611727 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,72 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), 1, smallint(100), bigint(1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), 2, smallint(200), bigint(2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), 3, smallint(300), bigint(3), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +as t1(a, b, c, d, e, f, g, h, i, j, k, l); + +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t1; + +-- with non-exist command +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t1; + +-- with non-exist file +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t1; + Review comment: DOne This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458608523 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,72 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458608012 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala ## @@ -56,10 +69,88 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } - def processIterator( + protected def initProc: (OutputStream, Process, InputStream, CircularBuffer) = { +val cmd = List("/bin/bash", "-c", script) +val builder = new ProcessBuilder(cmd.asJava) + +val proc = builder.start() +val inputStream = proc.getInputStream +val outputStream = proc.getOutputStream +val errorStream = proc.getErrorStream + +// In order to avoid deadlocks, we need to consume the error output of the child process. +// To avoid issues caused by large error output, we use a circular buffer to limit the amount +// of error output that we retain. See SPARK-7862 for more discussion of the deadlock / hang +// that motivates this. +val stderrBuffer = new CircularBuffer(2048) +new RedirectThread( + errorStream, + stderrBuffer, + s"Thread-${this.getClass.getSimpleName}-STDERR-Consumer").start() +(outputStream, proc, inputStream, stderrBuffer) + } + + protected def processIterator( inputIterator: Iterator[InternalRow], hadoopConf: Configuration): Iterator[InternalRow] + protected def createOutputIteratorWithoutSerde( + writerThread: BaseScriptTransformationWriterThread, + inputStream: InputStream, + proc: Process, + stderrBuffer: CircularBuffer): Iterator[InternalRow] = { +new Iterator[InternalRow] { + var curLine: String = null + val reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)) + + val outputRowFormat = ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD") + val processRowWithoutSerde = if (!ioschema.schemaLess) { +prevLine: String => + new GenericInternalRow( +prevLine.split(outputRowFormat) + .zip(fieldWriters) + .map { case (data, writer) => writer(data) }) + } else { +// In schema less mode, hive default serde will choose first two output column as output +// if output column size less then 2, it will throw ArrayIndexOutOfBoundsException. +// Here we change spark's behavior same as hive's default serde +prevLine: String => + new GenericInternalRow( +prevLine.split(outputRowFormat).slice(0, 2) + .map(CatalystTypeConverters.createToCatalystConverter(StringType))) Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458608774 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,72 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), 1, smallint(100), bigint(1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), 2, smallint(200), bigint(2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), 3, smallint(300), bigint(3), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +as t1(a, b, c, d, e, f, g, h, i, j, k, l); + +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t1; + +-- with non-exist command +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t1; + +-- with non-exist file +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t1; + +-- common supported data types between no serde and serde transform +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) +USING 'cat' AS ( +a string, +b boolean, +c binary, +d tinyint, +e int, +f smallint, +g long, +h float, +i double, +j decimal(38, 18), +k timestamp, +l date) +FROM t1 +) tmp; + +-- handle schema less +SELECT TRANSFORM(a) +USING 'cat' +FROM t1; + +SELECT TRANSFORM(a, b) +USING 'cat' +FROM t1; + +SELECT TRANSFORM(a, b, c) +USING 'cat' +FROM t1; + +-- return null when return string incompatible (no serde) +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i) +USING 'cat' as (a int, b short, c long, d byte, e float, f double, g decimal(38, 18), h date, i timestamp) +FROM VALUES +('a','','1231a','a','213.21a','213.21a','0a.21d','2000-04-01123','1997-0102 00:00:') tmp(a, b, c, d, e, f, g, h, i); + +-- SPARK-28227: transform can't run with aggregation +SELECT TRANSFORM(b, max(a), sum(f)) +USING 'cat' AS (a, b) +FROM t1 +GROUP BY b; + +-- transform use MAP +MAP a, b USING 'cat' AS (a, b) FROM t1; + +-- transform use REDUCE +REDUCE a, b USING 'cat' AS (a, b) FROM t1; + + Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458606278 ## File path: sql/core/src/test/resources/sql-tests/results/transform.sql.out ## @@ -0,0 +1,160 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 12 + + +-- !query +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), 1, smallint(100), bigint(1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), 2, smallint(200), bigint(2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), 3, smallint(300), bigint(3), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +as t1(a, b, c, d, e, f, g, h, i, j, k, l) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t1 +-- !query schema +struct +-- !query output +1 +2 +3 + + +-- !query +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t1 +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkException +Subprocess exited with status 127. Error: /bin/bash: some_non_existent_command: command not found + + +-- !query +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t1 +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkException +Subprocess exited with status 2. Error: python: can't open file 'some_non_existent_file': [Errno 2] No such file or directory + + +-- !query +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) +USING 'cat' AS ( +a string, +b boolean, +c binary, +d tinyint, +e int, +f smallint, +g long, +h float, +i double, +j decimal(38, 18), +k timestamp, +l date) +FROM t1 +) tmp +-- !query schema +struct +-- !query output +1 trueSpark SQL 1 1 100 1 1.0 1.0 1.001997-01-02 00:00:00 2000-04-01 +2 false Spark SQL 2 2 200 2 2.0 2.0 2.001997-01-02 03:04:05 2000-04-02 +3 trueSpark SQL 3 3 300 3 3.0 3.0 3.001997-02-10 17:32:01 2000-04-03 + + +-- !query +SELECT TRANSFORM(a) +USING 'cat' +FROM t1 +-- !query schema +struct<> +-- !query output +java.lang.ArrayIndexOutOfBoundsException Review comment: > What's the hive behaviour in this case? Same This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458540858 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala ## @@ -87,17 +175,69 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } } + + private lazy val fieldWriters: Seq[String => Any] = output.map { attr => +val converter = CatalystTypeConverters.createToCatalystConverter(attr.dataType) +attr.dataType match { + case StringType => wrapperConvertException(data => data, converter) + case BooleanType => wrapperConvertException(data => data.toBoolean, converter) + case ByteType => wrapperConvertException(data => data.toByte, converter) + case BinaryType => wrapperConvertException(data => data.getBytes, converter) + case IntegerType => wrapperConvertException(data => data.toInt, converter) + case ShortType => wrapperConvertException(data => data.toShort, converter) + case LongType => wrapperConvertException(data => data.toLong, converter) + case FloatType => wrapperConvertException(data => data.toFloat, converter) + case DoubleType => wrapperConvertException(data => data.toDouble, converter) + case _: DecimalType => wrapperConvertException(data => BigDecimal(data), converter) + case DateType if conf.datetimeJava8ApiEnabled => Review comment: > Could you add tests in `BaseScriptTransformationSuite.scala ` with this flag enabled/disabled? Done, see ``` test("SPARK-32106: TRANSFORM should support all data types as input (no serde)") { assume(TestUtils.testCommandAvailable("python")) Array(false, true).foreach { java8AapiEnable => withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8AapiEnable.toString) { withTempView("v") { .. . } } } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458537184 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala ## @@ -56,10 +69,85 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } - def processIterator( + protected def initProc: (OutputStream, Process, InputStream, CircularBuffer) = { +val cmd = List("/bin/bash", "-c", script) +val builder = new ProcessBuilder(cmd.asJava) + +val proc = builder.start() +val inputStream = proc.getInputStream +val outputStream = proc.getOutputStream +val errorStream = proc.getErrorStream + +// In order to avoid deadlocks, we need to consume the error output of the child process. +// To avoid issues caused by large error output, we use a circular buffer to limit the amount +// of error output that we retain. See SPARK-7862 for more discussion of the deadlock / hang +// that motivates this. +val stderrBuffer = new CircularBuffer(2048) +new RedirectThread( + errorStream, + stderrBuffer, + s"Thread-${this.getClass.getSimpleName}-STDERR-Consumer").start() +(outputStream, proc, inputStream, stderrBuffer) + } + + protected def processIterator( inputIterator: Iterator[InternalRow], hadoopConf: Configuration): Iterator[InternalRow] + protected def createOutputIteratorWithoutSerde( + writerThread: BaseScriptTransformationWriterThread, + inputStream: InputStream, + proc: Process, + stderrBuffer: CircularBuffer): Iterator[InternalRow] = { +new Iterator[InternalRow] { + var curLine: String = null + val reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)) + + val outputRowFormat = ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD") + val processRowWithoutSerde = if (!ioschema.schemaLess) { +prevLine: String => + new GenericInternalRow( +prevLine.split(outputRowFormat) + .zip(fieldWriters) + .map { case (data, writer) => writer(data) }) + } else { +prevLine: String => + new GenericInternalRow( +prevLine.split(outputRowFormat, 2) + .map(CatalystTypeConverters.convertToCatalyst)) Review comment: > Probably, we need to add some comments about how to handle the schemaless mode here. Make a little change to make schema less mode save result as hive serde ``` // In schema less mode, hive default serde will choose first two output column as output // if output column size less then 2, it will throw ArrayIndexOutOfBoundsException. // Here we change spark's behavior same as hive's default serde prevLine: String => new GenericInternalRow( prevLine.split(outputRowFormat).slice(0, 2) .map(CatalystTypeConverters.createToCatalystConverter(StringType))) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458534534 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,72 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), smallint(100), array_position(array(3, 2, 1), 1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), smallint(200), array_position(array(3, 2, 1), 2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), smallint(300), array_position(array(3, 2, 1), 1), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +as t1(a, b, c, d, e, f, g, h, i, j, k); + +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t1; + + +-- with non-exist command +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t1; + +-- with non-exist file +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t1; + + +-- support different data type +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k FROM ( +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k) +USING 'cat' AS ( +a string, +b boolean, +c binary, +d tinyint, +e smallint, +f long, +g float, +h double, +i decimal(38, 18), +j timestamp, +k date) +FROM t1 +) tmp; + + +-- handle schema less +SELECT TRANSFORM(a, b) Review comment: > schemaless Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458532562 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,72 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), smallint(100), array_position(array(3, 2, 1), 1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), smallint(200), array_position(array(3, 2, 1), 2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), smallint(300), array_position(array(3, 2, 1), 1), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +as t1(a, b, c, d, e, f, g, h, i, j, k); + +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t1; + + +-- with non-exist command +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t1; + +-- with non-exist file +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t1; + + +-- support different data type Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458532644 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,72 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), smallint(100), array_position(array(3, 2, 1), 1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), smallint(200), array_position(array(3, 2, 1), 2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), smallint(300), array_position(array(3, 2, 1), 1), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) Review comment: > Where's `int`? And, we need `array_position(array(3, 2, 1), 1)` for tests? `array_position(array(3, 2, 1), 1)` -> `bigint(3)`? Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458532129 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) +
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458524392 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,72 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), smallint(100), array_position(array(3, 2, 1), 1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), smallint(200), array_position(array(3, 2, 1), 2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), smallint(300), array_position(array(3, 2, 1), 1), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +as t1(a, b, c, d, e, f, g, h, i, j, k); + +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t1; + + Review comment: DOne This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458525381 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,72 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), smallint(100), array_position(array(3, 2, 1), 1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), smallint(200), array_position(array(3, 2, 1), 2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), smallint(300), array_position(array(3, 2, 1), 1), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +as t1(a, b, c, d, e, f, g, h, i, j, k); + +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t1; + + +-- with non-exist command +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t1; + +-- with non-exist file +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t1; + + +-- support different data type +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k FROM ( +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k) +USING 'cat' AS ( +a string, +b boolean, +c binary, +d tinyint, +e smallint, +f long, +g float, +h double, +i decimal(38, 18), +j timestamp, +k date) +FROM t1 +) tmp; + + +-- handle schema less +SELECT TRANSFORM(a, b) +USING 'cat' +FROM t1; + +-- return null when return string incompatible (no serde) +SELECT TRANSFORM(a, b, c) +USING 'cat' as (a int, b int , c int) +FROM ( +SELECT Review comment: > We need the subquery for this test? personal habit, removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458524287 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,72 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), smallint(100), array_position(array(3, 2, 1), 1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), smallint(200), array_position(array(3, 2, 1), 2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), smallint(300), array_position(array(3, 2, 1), 1), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +as t1(a, b, c, d, e, f, g, h, i, j, k); + +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t1; + + +-- with non-exist command +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t1; + +-- with non-exist file +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t1; + + +-- support different data type +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k FROM ( +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k) +USING 'cat' AS ( +a string, +b boolean, +c binary, +d tinyint, +e smallint, +f long, +g float, +h double, +i decimal(38, 18), +j timestamp, +k date) +FROM t1 +) tmp; + + +-- handle schema less +SELECT TRANSFORM(a, b) +USING 'cat' +FROM t1; + +-- return null when return string incompatible (no serde) +SELECT TRANSFORM(a, b, c) +USING 'cat' as (a int, b int , c int) Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458524490 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,72 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), smallint(100), array_position(array(3, 2, 1), 1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), smallint(200), array_position(array(3, 2, 1), 2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), smallint(300), array_position(array(3, 2, 1), 1), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +as t1(a, b, c, d, e, f, g, h, i, j, k); + +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t1; + + +-- with non-exist command +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t1; + +-- with non-exist file +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t1; + + +-- support different data type +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k FROM ( +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k) +USING 'cat' AS ( +a string, +b boolean, +c binary, +d tinyint, +e smallint, +f long, +g float, +h double, +i decimal(38, 18), +j timestamp, +k date) +FROM t1 +) tmp; + + +-- handle schema less +SELECT TRANSFORM(a, b) +USING 'cat' +FROM t1; + +-- return null when return string incompatible (no serde) +SELECT TRANSFORM(a, b, c) +USING 'cat' as (a int, b int , c int) +FROM ( +SELECT +1 AS a, +"a" AS b, +CAST(2000 AS timestamp) AS c +) tmp; + + +-- transform can't run with aggregation Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r458522569 ## File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala ## @@ -258,6 +258,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper newLine.startsWith("--") && !newLine.startsWith("--QUERY-DELIMITER") } +assume(TestUtils.testCommandAvailable("/bin/bash")) Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r457025605 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) +
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r457071605 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ## @@ -713,13 +714,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } (Seq.empty, Option(name), props.toSeq, recordHandler) - case null => + case null if conf.getConf(CATALOG_IMPLEMENTATION).equals("hive") => // Use default (serde) format. val name = conf.getConfString("hive.script.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") val props = Seq("field.delim" -> "\t") val recordHandler = Option(conf.getConfString(configKey, defaultConfigValue)) (Nil, Option(name), props, recordHandler) + + // SPARK-32106: When there is no definition about format, we return empty result + // to use a built-in default Serde in SparkScriptTransformationExec. + case null => +(Nil, None, Seq.empty, None) Review comment: > > CalenderIntervalType/ArrayType/MapType/StructType as input of hive default serde will throw error > > btw, we already have end-2-end tests for the unspported cases in the hive side? Added This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r457025605 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) +
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r457025094 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,49 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('a'), ('b'), ('v') +as t1(a); + +CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), array_position(array(3, 2, 1), 1), float(1.0), 1.0, Decimal(1.0), timestamp(1), current_date), +('2', false, unhex('537061726B2053514C'), tinyint(2), array_position(array(3, 2, 1), 2), float(2.0), 2.0, Decimal(2.0), timestamp(2), current_date), +('3', true, unhex('537061726B2053514C'), tinyint(3), array_position(array(3, 2, 1), 1), float(3.0), 3.0, Decimal(3.0), timestamp(3), current_date) +as t2(a,b,c,d,e,f,g,h,i,j); + +SELECT TRANSFORM(a) Review comment: Added some case without serde. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r457025094 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,49 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('a'), ('b'), ('v') +as t1(a); + +CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), array_position(array(3, 2, 1), 1), float(1.0), 1.0, Decimal(1.0), timestamp(1), current_date), +('2', false, unhex('537061726B2053514C'), tinyint(2), array_position(array(3, 2, 1), 2), float(2.0), 2.0, Decimal(2.0), timestamp(2), current_date), +('3', true, unhex('537061726B2053514C'), tinyint(3), array_position(array(3, 2, 1), 1), float(3.0), 3.0, Decimal(3.0), timestamp(3), current_date) +as t2(a,b,c,d,e,f,g,h,i,j); + +SELECT TRANSFORM(a) Review comment: Added some case without serde. With serde will show different when with/without hive This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456997363 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) +
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456995841 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) +
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456994439 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,49 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('a'), ('b'), ('v') +as t1(a); + +CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES Review comment: > we need the two tests for the following tests? Can merge this, Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456994119 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) +
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456992984 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ## @@ -713,13 +714,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } (Seq.empty, Option(name), props.toSeq, recordHandler) - case null => + case null if conf.getConf(CATALOG_IMPLEMENTATION).equals("hive") => // Use default (serde) format. val name = conf.getConfString("hive.script.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") val props = Seq("field.delim" -> "\t") val recordHandler = Option(conf.getConfString(configKey, defaultConfigValue)) (Nil, Option(name), props, recordHandler) + + // SPARK-32106: When there is no definition about format, we return empty result + // to use a built-in default Serde in SparkScriptTransformationExec. + case null => +(Nil, None, Seq.empty, None) Review comment: > If so, why does the tests pass in `HiveScriptTransformationSuite`? > [#29085 (comment)](https://github.com/apache/spark/pull/29085#discussion_r456974391) Since it directly build SparkPlan, don't use sql parser This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456991315 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) +
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456991221 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ## @@ -713,13 +714,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } (Seq.empty, Option(name), props.toSeq, recordHandler) - case null => + case null if conf.getConf(CATALOG_IMPLEMENTATION).equals("hive") => // Use default (serde) format. val name = conf.getConfString("hive.script.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") val props = Seq("field.delim" -> "\t") val recordHandler = Option(conf.getConfString(configKey, defaultConfigValue)) (Nil, Option(name), props, recordHandler) + + // SPARK-32106: When there is no definition about format, we return empty result + // to use a built-in default Serde in SparkScriptTransformationExec. + case null => +(Nil, None, Seq.empty, None) Review comment: For your confuse https://github.com/apache/spark/pull/29085#discussion_r456974391, CalenderIntervalType/ArrayType/MapType/StructType as input of hive default serde will throw error, won't throw error in spark default way. ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ## @@ -713,13 +714,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } (Seq.empty, Option(name), props.toSeq, recordHandler) - case null => + case null if conf.getConf(CATALOG_IMPLEMENTATION).equals("hive") => // Use default (serde) format. val name = conf.getConfString("hive.script.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") val props = Seq("field.delim" -> "\t") val recordHandler = Option(conf.getConfString(configKey, defaultConfigValue)) (Nil, Option(name), props, recordHandler) + + // SPARK-32106: When there is no definition about format, we return empty result + // to use a built-in default Serde in SparkScriptTransformationExec. + case null => +(Nil, None, Seq.empty, None) Review comment: @maropu For your confuse https://github.com/apache/spark/pull/29085#discussion_r456974391, CalenderIntervalType/ArrayType/MapType/StructType as input of hive default serde will throw error, won't throw error in spark default way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456991315 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) +
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456990164 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,49 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('a'), ('b'), ('v') +as t1(a); + +CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), array_position(array(3, 2, 1), 1), float(1.0), 1.0, Decimal(1.0), timestamp(1), current_date), +('2', false, unhex('537061726B2053514C'), tinyint(2), array_position(array(3, 2, 1), 2), float(2.0), 2.0, Decimal(2.0), timestamp(2), current_date), +('3', true, unhex('537061726B2053514C'), tinyint(3), array_position(array(3, 2, 1), 1), float(3.0), 3.0, Decimal(3.0), timestamp(3), current_date) +as t2(a,b,c,d,e,f,g,h,i,j); + +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t1; + + +-- with non-exist command +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t1; + +-- with non-exist file +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t1; + + +-- support different data type +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j FROM ( +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j) +USING 'cat' AS (a string, b boolean, c binary, d tinyint, e long, f float, g double, h decimal(38, 18), i timestamp, j date) +FROM t2 +) tmp; + + +-- handle schema less +SELECT TRANSFORM(a, b) +USING 'cat' Review comment: Yea This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456989221 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala ## @@ -56,10 +65,84 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } - def processIterator( + protected def initProc: (OutputStream, Process, InputStream, CircularBuffer) = { +val cmd = List("/bin/bash", "-c", script) +val builder = new ProcessBuilder(cmd.asJava) + +val proc = builder.start() +val inputStream = proc.getInputStream +val outputStream = proc.getOutputStream +val errorStream = proc.getErrorStream + +// In order to avoid deadlocks, we need to consume the error output of the child process. +// To avoid issues caused by large error output, we use a circular buffer to limit the amount +// of error output that we retain. See SPARK-7862 for more discussion of the deadlock / hang +// that motivates this. +val stderrBuffer = new CircularBuffer(2048) +new RedirectThread( + errorStream, + stderrBuffer, + s"Thread-${this.getClass.getSimpleName}-STDERR-Consumer").start() +(outputStream, proc, inputStream, stderrBuffer) + } + + protected def processIterator( inputIterator: Iterator[InternalRow], hadoopConf: Configuration): Iterator[InternalRow] + protected def createOutputIteratorWithoutSerde( + writerThread: BaseScriptTransformationWriterThread, + inputStream: InputStream, + proc: Process, + stderrBuffer: CircularBuffer): Iterator[InternalRow] = { +new Iterator[InternalRow] { + var curLine: String = null + val reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)) + + val processRowWithoutSerde = if (!ioschema.schemaLess) { +prevLine: String => + new GenericInternalRow( + prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD")) Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456989111 ## File path: sql/core/src/test/resources/sql-tests/results/transform.sql.out ## @@ -0,0 +1,99 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 8 + + +-- !query +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('a'), ('b'), ('v') +as t1(a) +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), array_position(array(3, 2, 1), 1), float(1.0), 1.0, Decimal(1.0), timestamp(1), current_date), +('2', false, unhex('537061726B2053514C'), tinyint(2), array_position(array(3, 2, 1), 2), float(2.0), 2.0, Decimal(2.0), timestamp(2), current_date), +('3', true, unhex('537061726B2053514C'), tinyint(3), array_position(array(3, 2, 1), 1), float(3.0), 3.0, Decimal(3.0), timestamp(3), current_date) +as t2(a,b,c,d,e,f,g,h,i,j) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t1 +-- !query schema +struct +-- !query output +a +b +v + + +-- !query +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t1 +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkException +Subprocess exited with status 127. Error: /bin/bash: some_non_existent_command: command not found + + +-- !query +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t1 +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkException +Subprocess exited with status 2. Error: python: can't open file 'some_non_existent_file': [Errno 2] No such file or directory + + +-- !query +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j FROM ( +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j) +USING 'cat' AS (a string, b boolean, c binary, d tinyint, e long, f float, g double, h decimal(38, 18), i timestamp, j date) +FROM t2 +) tmp +-- !query schema +struct Review comment: > where is `smallint`? Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456989025 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) +
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456988297 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala ## @@ -87,17 +170,59 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } } + + private lazy val fieldWriters: Seq[String => Any] = output.map { attr => +val converter = CatalystTypeConverters.createToCatalystConverter(attr.dataType) +attr.dataType match { + case StringType => wrapperConvertException(data => data, converter) + case BooleanType => wrapperConvertException(data => data.toBoolean, converter) + case ByteType => wrapperConvertException(data => data.toByte, converter) + case BinaryType => wrapperConvertException(data => data.getBytes, converter) + case IntegerType => wrapperConvertException(data => data.toInt, converter) + case ShortType => wrapperConvertException(data => data.toShort, converter) + case LongType => wrapperConvertException(data => data.toLong, converter) + case FloatType => wrapperConvertException(data => data.toFloat, converter) + case DoubleType => wrapperConvertException(data => data.toDouble, converter) + case _: DecimalType => wrapperConvertException(data => BigDecimal(data), converter) + case DateType => wrapperConvertException(data => DateTimeUtils.stringToDate( +UTF8String.fromString(data), +DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) +.map(DateTimeUtils.toJavaDate).orNull, converter) + case TimestampType => wrapperConvertException(data => DateTimeUtils.stringToTimestamp( +UTF8String.fromString(data), +DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) +.map(DateTimeUtils.toJavaTimestamp).orNull, converter) + case CalendarIntervalType => wrapperConvertException( +data => IntervalUtils.stringToInterval(UTF8String.fromString(data)), +converter) + case udt: UserDefinedType[_] => +wrapperConvertException(data => udt.deserialize(data), converter) + case _: DataType => wrapperConvertException(data => data, converter) +} + } + + // Keep consistent with Hive `LazySimpleSerde`, when there is a type case error, return null + val wrapperConvertException: (String => Any, Any => Any) => String => Any = Review comment: DOne This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456988259 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala ## @@ -87,17 +170,59 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } } + + private lazy val fieldWriters: Seq[String => Any] = output.map { attr => +val converter = CatalystTypeConverters.createToCatalystConverter(attr.dataType) +attr.dataType match { + case StringType => wrapperConvertException(data => data, converter) + case BooleanType => wrapperConvertException(data => data.toBoolean, converter) + case ByteType => wrapperConvertException(data => data.toByte, converter) + case BinaryType => wrapperConvertException(data => data.getBytes, converter) + case IntegerType => wrapperConvertException(data => data.toInt, converter) + case ShortType => wrapperConvertException(data => data.toShort, converter) + case LongType => wrapperConvertException(data => data.toLong, converter) + case FloatType => wrapperConvertException(data => data.toFloat, converter) + case DoubleType => wrapperConvertException(data => data.toDouble, converter) + case _: DecimalType => wrapperConvertException(data => BigDecimal(data), converter) + case DateType => wrapperConvertException(data => DateTimeUtils.stringToDate( +UTF8String.fromString(data), +DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) +.map(DateTimeUtils.toJavaDate).orNull, converter) + case TimestampType => wrapperConvertException(data => DateTimeUtils.stringToTimestamp( +UTF8String.fromString(data), +DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) +.map(DateTimeUtils.toJavaTimestamp).orNull, converter) + case CalendarIntervalType => wrapperConvertException( +data => IntervalUtils.stringToInterval(UTF8String.fromString(data)), +converter) + case udt: UserDefinedType[_] => +wrapperConvertException(data => udt.deserialize(data), converter) + case _: DataType => wrapperConvertException(data => data, converter) Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456987977 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,49 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('a'), ('b'), ('v') +as t1(a); + +CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), array_position(array(3, 2, 1), 1), float(1.0), 1.0, Decimal(1.0), timestamp(1), current_date), +('2', false, unhex('537061726B2053514C'), tinyint(2), array_position(array(3, 2, 1), 2), float(2.0), 2.0, Decimal(2.0), timestamp(2), current_date), +('3', true, unhex('537061726B2053514C'), tinyint(3), array_position(array(3, 2, 1), 1), float(3.0), 3.0, Decimal(3.0), timestamp(3), current_date) +as t2(a,b,c,d,e,f,g,h,i,j); + +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t1; + + +-- with non-exist command +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t1; + +-- with non-exist file +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t1; + + +-- support different data type +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j FROM ( +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j) +USING 'cat' AS (a string, b boolean, c binary, d tinyint, e long, f float, g double, h decimal(38, 18), i timestamp, j date) +FROM t2 +) tmp; + + +-- handle schema less +SELECT TRANSFORM(a, b) +USING 'cat' +FROM t2; + +-- return null when return string incompatible(no serde) Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456987899 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,49 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('a'), ('b'), ('v') +as t1(a); + +CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), array_position(array(3, 2, 1), 1), float(1.0), 1.0, Decimal(1.0), timestamp(1), current_date), +('2', false, unhex('537061726B2053514C'), tinyint(2), array_position(array(3, 2, 1), 2), float(2.0), 2.0, Decimal(2.0), timestamp(2), current_date), +('3', true, unhex('537061726B2053514C'), tinyint(3), array_position(array(3, 2, 1), 1), float(3.0), 3.0, Decimal(3.0), timestamp(3), current_date) Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456986270 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala ## @@ -172,66 +193,95 @@ case class HiveScriptTransformationExec( if (!hasNext) { throw new NoSuchElementException } -if (outputSerde == null) { - val prevLine = curLine - curLine = reader.readLine() - if (!ioschema.schemaLess) { -new GenericInternalRow( - prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD")) -.map(CatalystTypeConverters.convertToCatalyst)) +val raw = outputSerde.deserialize(scriptOutputWritable) +scriptOutputWritable = null +val dataList = outputSoi.getStructFieldsDataAsList(raw) +var i = 0 +while (i < dataList.size()) { + if (dataList.get(i) == null) { +mutableRow.setNullAt(i) } else { -new GenericInternalRow( - prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD"), 2) -.map(CatalystTypeConverters.convertToCatalyst)) - } -} else { - val raw = outputSerde.deserialize(scriptOutputWritable) - scriptOutputWritable = null - val dataList = outputSoi.getStructFieldsDataAsList(raw) - var i = 0 - while (i < dataList.size()) { -if (dataList.get(i) == null) { - mutableRow.setNullAt(i) -} else { - unwrappers(i)(dataList.get(i), mutableRow, i) -} -i += 1 +unwrappers(i)(dataList.get(i), mutableRow, i) } - mutableRow + i += 1 } +mutableRow } } + } + + override def processIterator( + inputIterator: Iterator[InternalRow], + hadoopConf: Configuration): Iterator[InternalRow] = { + +val (outputStream, proc, inputStream, stderrBuffer) = initProc + +// This nullability is a performance optimization in order to avoid an Option.foreach() call +// inside of a loop +@Nullable val (inputSerde, inputSoi) = initInputSerDe(input).getOrElse((null, null)) + +// For HiveScriptTransformationExec, if inputSerde == null, but outputSerde != null +// We will use StringBuffer to pass data, in this case, we should cast data as string too. +val finalInput = if (inputSerde == null) { + input.map(Cast(_, StringType).withTimeZone(conf.sessionLocalTimeZone)) Review comment: Add ``` protected lazy val inputExpressionsWithoutSerde: Seq[Expression] = { input.map(Cast(_, StringType).withTimeZone(conf.sessionLocalTimeZone)) } ``` to BaseScriptTransformationExec This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456860293 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala ## @@ -87,17 +170,59 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } } + + private lazy val fieldWriters: Seq[String => Any] = output.map { attr => +val converter = CatalystTypeConverters.createToCatalystConverter(attr.dataType) +attr.dataType match { + case StringType => wrapperConvertException(data => data, converter) + case BooleanType => wrapperConvertException(data => data.toBoolean, converter) + case ByteType => wrapperConvertException(data => data.toByte, converter) + case BinaryType => wrapperConvertException(data => data.getBytes, converter) + case IntegerType => wrapperConvertException(data => data.toInt, converter) Review comment: @marope Add type convert for BooleanType and BinaryType, and I have add it in UT `test("SPARK-32106: TRANSFORM should support more data types (no serde)")` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456860128 ## File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql ## @@ -0,0 +1,47 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +('a'), ('b'), ('v') +as t1(a); + +CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES +(1, '1', 1.0, Decimal(1.0), timestamp(1)), +(2, '2', 2.0, Decimal(2.0), timestamp(2)), +(2, '2', 3.0, Decimal(3.0), timestamp(3)) Review comment: > Could you add all the typed data for tests where possible, e.g., `date` and `float`. Updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456793532 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Column, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) + df.createTempView("v") + + val query = sql( +s""" + |SELECT + |TRANSFORM(a, b, c, d, e)
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456780423 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) + df.createTempView("v") + + val query = sql( +s""" + |SELECT + |TRANSFORM(a, b, c, d, e) +
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456770946 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) + df.createTempView("v") + + val query = sql( +s""" + |SELECT + |TRANSFORM(a, b, c, d, e) +
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456770897 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala ## @@ -87,17 +171,55 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } } + + private lazy val fieldWriters: Seq[String => Any] = output.map { attr => +val converter = CatalystTypeConverters.createToCatalystConverter(attr.dataType) +attr.dataType match { + case StringType => (data: String) => converter(data) + case ByteType => (data: String) => converter(data.toByte) + case IntegerType => (data: String) => converter(data.toInt) + case ShortType => (data: String) => converter(data.toShort) + case LongType => (data: String) => converter(data.toLong) + case FloatType => (data: String) => converter(data.toFloat) + case DoubleType => (data: String) => converter(data.toDouble) + case decimal: DecimalType => (data: String) => converter(BigDecimal(data)) + case DateType if conf.datetimeJava8ApiEnabled => (data: String) => +converter(DateTimeUtils.stringToDate( + UTF8String.fromString(data), + DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) + .map(DateTimeUtils.daysToLocalDate).orNull) Review comment: > Need this? Seems like not. Tested, seems not need, remove. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456770213 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) + df.createTempView("v") + + val query = sql( +s""" + |SELECT + |TRANSFORM(a, b, c, d, e) +
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456770157 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) + df.createTempView("v") + + val query = sql( +s""" + |SELECT + |TRANSFORM(a, b, c, d, e) +
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456769354 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) + df.createTempView("v") + + val query = sql( +s""" + |SELECT + |TRANSFORM(a, b, c, d, e) +
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456766188 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) + df.createTempView("v") + + val query = sql( +s""" + |SELECT + |TRANSFORM(a, b, c, d, e) +
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456751726 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala ## @@ -56,10 +65,85 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } - def processIterator( + protected def initProc: (OutputStream, Process, InputStream, CircularBuffer) = { +val cmd = List("/bin/bash", "-c", script) +val builder = new ProcessBuilder(cmd.asJava) + +val proc = builder.start() +val inputStream = proc.getInputStream +val outputStream = proc.getOutputStream +val errorStream = proc.getErrorStream + +// In order to avoid deadlocks, we need to consume the error output of the child process. +// To avoid issues caused by large error output, we use a circular buffer to limit the amount +// of error output that we retain. See SPARK-7862 for more discussion of the deadlock / hang +// that motivates this. +val stderrBuffer = new CircularBuffer(2048) +new RedirectThread( + errorStream, + stderrBuffer, + s"Thread-${this.getClass.getSimpleName}-STDERR-Consumer").start() +(outputStream, proc, inputStream, stderrBuffer) + } + + protected def processIterator( inputIterator: Iterator[InternalRow], hadoopConf: Configuration): Iterator[InternalRow] + protected def createOutputIteratorWithoutSerde( + writerThread: BaseScriptTransformationWriterThread, + inputStream: InputStream, + proc: Process, + stderrBuffer: CircularBuffer): Iterator[InternalRow] = { +new Iterator[InternalRow] { + var curLine: String = null + val reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)) + + val processRowWithoutSerde = if (!ioschema.schemaLess) { +prevLine: String => + new GenericInternalRow( + prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD")) + .zip(fieldWriters) + .map { case (data, writer) => writer(data) }) + } else { +prevLine: String => + new GenericInternalRow( + prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD"), 2) + .map(CatalystTypeConverters.convertToCatalyst)) + } + Review comment: @maropu Here I change for support schema less mode. In test case I choose not to use sql since hive serde can't support schemaless mode well in spark's way. ``` [info] - SPARK-25990: TRANSFORM should handle schema less correctly *** FAILED *** (360 milliseconds) [info] Results do not match for Spark plan: [info]HiveScriptTransformation [a#86, b#87, c#88, d#89, e#90], python /Users/angerszhu/Documents/project/AngersZhu/spark/sql/core/target/test-classes/test_script.py, [key#96, value#97], ScriptTransformationIOSchema(List(),List(),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),List((field.delim, )),List((field.delim, )),Some(org.apache.hadoop.hive.ql.exec.TextRecordReader),Some(org.apache.hadoop.hive.ql.exec.TextRecordWriter),true) [info] +- Project [_1#75 AS a#86, _2#76 AS b#87, _3#77 AS c#88, _4#78 AS d#89, _5#79 AS e#90] [info] +- LocalTableScan [_1#75, _2#76, _3#77, _4#78, _5#79] [info] [info] [info]== Results == [info]!== Expected Answer - 3 ==== Actual Answer - 3 == [info] ![1,1 1.0 1.001969-12-31 16:00:00.001] [1,1] [info] ![2,2 2.0 2.001969-12-31 16:00:00.002] [2,2] [info] ![3,3 3.0 3.001969-12-31 16:00:00.003] [3,3] (SparkPlanTest.scala:96) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) [info] at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560) [i ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456751726 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala ## @@ -56,10 +65,85 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } - def processIterator( + protected def initProc: (OutputStream, Process, InputStream, CircularBuffer) = { +val cmd = List("/bin/bash", "-c", script) +val builder = new ProcessBuilder(cmd.asJava) + +val proc = builder.start() +val inputStream = proc.getInputStream +val outputStream = proc.getOutputStream +val errorStream = proc.getErrorStream + +// In order to avoid deadlocks, we need to consume the error output of the child process. +// To avoid issues caused by large error output, we use a circular buffer to limit the amount +// of error output that we retain. See SPARK-7862 for more discussion of the deadlock / hang +// that motivates this. +val stderrBuffer = new CircularBuffer(2048) +new RedirectThread( + errorStream, + stderrBuffer, + s"Thread-${this.getClass.getSimpleName}-STDERR-Consumer").start() +(outputStream, proc, inputStream, stderrBuffer) + } + + protected def processIterator( inputIterator: Iterator[InternalRow], hadoopConf: Configuration): Iterator[InternalRow] + protected def createOutputIteratorWithoutSerde( + writerThread: BaseScriptTransformationWriterThread, + inputStream: InputStream, + proc: Process, + stderrBuffer: CircularBuffer): Iterator[InternalRow] = { +new Iterator[InternalRow] { + var curLine: String = null + val reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)) + + val processRowWithoutSerde = if (!ioschema.schemaLess) { +prevLine: String => + new GenericInternalRow( + prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD")) + .zip(fieldWriters) + .map { case (data, writer) => writer(data) }) + } else { +prevLine: String => + new GenericInternalRow( + prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD"), 2) + .map(CatalystTypeConverters.convertToCatalyst)) + } + Review comment: @maropu Here I change for support schema less mode. In test case I choose not to use sql since hive serde can't support schemaless mode well in spark's way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456751726 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala ## @@ -56,10 +65,85 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } - def processIterator( + protected def initProc: (OutputStream, Process, InputStream, CircularBuffer) = { +val cmd = List("/bin/bash", "-c", script) +val builder = new ProcessBuilder(cmd.asJava) + +val proc = builder.start() +val inputStream = proc.getInputStream +val outputStream = proc.getOutputStream +val errorStream = proc.getErrorStream + +// In order to avoid deadlocks, we need to consume the error output of the child process. +// To avoid issues caused by large error output, we use a circular buffer to limit the amount +// of error output that we retain. See SPARK-7862 for more discussion of the deadlock / hang +// that motivates this. +val stderrBuffer = new CircularBuffer(2048) +new RedirectThread( + errorStream, + stderrBuffer, + s"Thread-${this.getClass.getSimpleName}-STDERR-Consumer").start() +(outputStream, proc, inputStream, stderrBuffer) + } + + protected def processIterator( inputIterator: Iterator[InternalRow], hadoopConf: Configuration): Iterator[InternalRow] + protected def createOutputIteratorWithoutSerde( + writerThread: BaseScriptTransformationWriterThread, + inputStream: InputStream, + proc: Process, + stderrBuffer: CircularBuffer): Iterator[InternalRow] = { +new Iterator[InternalRow] { + var curLine: String = null + val reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)) + + val processRowWithoutSerde = if (!ioschema.schemaLess) { +prevLine: String => + new GenericInternalRow( + prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD")) + .zip(fieldWriters) + .map { case (data, writer) => writer(data) }) + } else { +prevLine: String => + new GenericInternalRow( + prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD"), 2) + .map(CatalystTypeConverters.convertToCatalyst)) + } + Review comment: @maropu Here I change for support schema less mode. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456751690 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SparkScriptTransformationSuite.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.sql.{Date, Timestamp} + +import org.apache.spark.TestUtils +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.functions.struct +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +class SparkScriptTransformationSuite extends BaseScriptTransformationSuite { + + import spark.implicits._ + + override def scriptType: String = "SPARK" + + test("SPARK-32106: SparkScriptTransformExec should handle different data types correctly") { Review comment: > This test should be placed in `BaseScriptTransformationSuite`? Yea, moved This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456751677 ## File path: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala ## @@ -172,19 +237,17 @@ case class HiveScriptTransformationExec( if (!hasNext) { throw new NoSuchElementException } -if (outputSerde == null) { +nextRow() + } + + val nextRow: () => InternalRow = if (outputSerde == null) { Review comment: > hm... could we write it like this here? [maropu@f3e05c6](https://github.com/maropu/spark/commit/f3e05c6e1ea1e195ff2cbc9e3aa70c45cf9cc79f) Changed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456751646 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { +super.beforeAll() +defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler +Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { +super.afterAll() +Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { +super.afterEach() +uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( +input = Seq(rowsDf.col("a").expr), +script = "cat", +output = Seq(AttributeReference("a", StringType)()), +child = child, +ioschema = defaultIOSchema + ), + rowsDf.collect()) +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { +assume(TestUtils.testCommandAvailable("/bin/bash")) + +val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") +val e = intercept[TestFailedException] { + checkAnswer( +rowsDf, +(child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema +), +rowsDf.collect()) +} +assert(e.getMessage().contains("intentional exception")) +// Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException +assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { +assume(TestUtils.testCommandAvailable("python")) +val scriptFilePath = getTestResourcePath("test_script.py") + +withTempView("v") { + val df = Seq( +(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), +(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), +(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) + df.createTempView("v") + + val query = sql( +s""" + |SELECT + |TRANSFORM(a, b, c, d, e) +
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456343066 ## File path: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SparkScriptTransformationSuite.scala ## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.sql.{Date, Timestamp} + +import org.apache.spark.TestUtils +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.execution.{ScriptTransformationIOSchema, SparkPlan} +import org.apache.spark.sql.functions.struct +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +class SparkScriptTransformationSuite extends BaseScriptTransformationSuite { + + import spark.implicits._ + + override def scriptType: String = "SPARK" + + noSerdeIOSchema = ScriptTransformationIOSchema( +inputRowFormat = Seq.empty, +outputRowFormat = Seq.empty, +inputSerdeClass = None, +outputSerdeClass = None, +inputSerdeProps = Seq.empty, +outputSerdeProps = Seq.empty, +recordReaderClass = None, +recordWriterClass = None, +schemaLess = false Review comment: Done ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala ## @@ -56,10 +65,45 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } - def processIterator( + protected def initProc: (OutputStream, Process, InputStream, CircularBuffer) = { +val cmd = List("/bin/bash", "-c", script) +val builder = new ProcessBuilder(cmd.asJava) + +val proc = builder.start() +val inputStream = proc.getInputStream +val outputStream = proc.getOutputStream +val errorStream = proc.getErrorStream + +// In order to avoid deadlocks, we need to consume the error output of the child process. +// To avoid issues caused by large error output, we use a circular buffer to limit the amount +// of error output that we retain. See SPARK-7862 for more discussion of the deadlock / hang +// that motivates this. +val stderrBuffer = new CircularBuffer(2048) +new RedirectThread( + errorStream, + stderrBuffer, + s"Thread-${this.getClass.getSimpleName}-STDERR-Consumer").start() +(outputStream, proc, inputStream, stderrBuffer) + } + + protected def processIterator( inputIterator: Iterator[InternalRow], hadoopConf: Configuration): Iterator[InternalRow] + protected def processOutputWithoutSerde(prevLine: String, reader: BufferedReader): InternalRow = { +if (!ioschema.schemaLess) { + new GenericInternalRow( +prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD")) + .zip(fieldWriters) + .map { case (data, writer) => writer(data) }) +} else { + new GenericInternalRow( +prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD"), 2) + .zip(fieldWriters) + .map { case (data, writer) => writer(data) }) +} Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29085: [SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZh commented on a change in pull request #29085: URL: https://github.com/apache/spark/pull/29085#discussion_r456309141 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala ## @@ -87,17 +131,55 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } } + + private lazy val fieldWriters: Seq[String => Any] = output.map { attr => +val converter = CatalystTypeConverters.createToCatalystConverter(attr.dataType) +attr.dataType match { + case StringType => (data: String) => converter(data) + case ByteType => (data: String) => converter(data.toByte) + case IntegerType => (data: String) => converter(data.toInt) + case ShortType => (data: String) => converter(data.toShort) + case LongType => (data: String) => converter(data.toLong) + case FloatType => (data: String) => converter(data.toFloat) + case DoubleType => (data: String) => converter(data.toDouble) + case dt: DecimalType => (data: String) => converter(BigDecimal(data)) + case DateType if conf.datetimeJava8ApiEnabled => (data: String) => Review comment: Yea, update later, and How to decide to use which serde or default serde, I think it's better to decide in alfozan's pr. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org