maropu commented on a change in pull request #29085:
URL: https://github.com/apache/spark/pull/29085#discussion_r459136940
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
##########
@@ -744,8 +744,29 @@ class AstBuilder(conf: SQLConf) extends
SqlBaseBaseVisitor[AnyRef] with Logging
selectClause.hints.asScala.foldRight(withWindow)(withHints)
}
+ // Decode and input/output format.
+ type Format = (Seq[(String, String)], Option[String], Seq[(String, String)],
Option[String])
Review comment:
`Format` -> `ScriptIOFormat`? Then, could you make the comment above
clearer?
##########
File path:
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala
##########
@@ -206,75 +169,147 @@ class HiveScriptTransformationSuite extends
SparkPlanTest with SQLTestUtils with
val query = sql(
s"""
- |SELECT
- |TRANSFORM(a, b, c, d, e)
- |USING 'python $scriptFilePath' AS (a, b, c, d, e)
- |FROM v
+ |SELECT TRANSFORM(a, b, c, d, e)
+ |USING 'python ${scriptFilePath}'
+ |FROM v
""".stripMargin)
- // In Hive 1.2, the string representation of a decimal omits trailing
zeroes.
- // But in Hive 2.3, it is always padded to 18 digits with trailing
zeroes if necessary.
- val decimalToString: Column => Column = if (HiveUtils.isHive23) {
- c => c.cast("string")
- } else {
- c => c.cast("decimal(1, 0)").cast("string")
- }
- checkAnswer(query, identity, df.select(
- 'a.cast("string"),
- 'b.cast("string"),
- 'c.cast("string"),
- decimalToString('d),
- 'e.cast("string")).collect())
+ // In hive default serde mode, if we don't define output schema, it will
choose first
+ // two column as output schema (key: String, value: String)
+ checkAnswer(
+ query,
+ identity,
+ df.select(
+ 'a.cast("string").as("key"),
+ 'b.cast("string").as("value")).collect())
}
}
- test("SPARK-30973: TRANSFORM should wait for the termination of the script
(no serde)") {
+ testBasicInputDataTypesWith(hiveIOSchema, "hive serde")
+
+ test("SPARK-32106: TRANSFORM supports complex data types type (hive serde)")
{
assume(TestUtils.testCommandAvailable("/bin/bash"))
+ withTempView("v") {
+ val df = Seq(
+ (1, "1", Array(0, 1, 2), Map("a" -> 1)),
+ (2, "2", Array(3, 4, 5), Map("b" -> 2)))
+ .toDF("a", "b", "c", "d")
+ .select('a, 'b, 'c, 'd, struct('a, 'b).as("e"))
+ df.createTempView("v")
- val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
- val e = intercept[SparkException] {
- val plan =
- new HiveScriptTransformationExec(
- input = Seq(rowsDf.col("a").expr),
- script = "some_non_existent_command",
- output = Seq(AttributeReference("a", StringType)()),
- child = rowsDf.queryExecution.sparkPlan,
- ioschema = noSerdeIOSchema)
- SparkPlanTest.executePlan(plan, hiveContext)
+ // Hive serde support ArrayType/MapType/StructType as input and output
data type
+ checkAnswer(
+ df,
+ (child: SparkPlan) => createScriptTransformationExec(
+ input = Seq(
+ df.col("c").expr,
+ df.col("d").expr,
+ df.col("e").expr),
+ script = "cat",
+ output = Seq(
+ AttributeReference("c", ArrayType(IntegerType))(),
+ AttributeReference("d", MapType(StringType, IntegerType))(),
+ AttributeReference("e", StructType(
+ Seq(
+ StructField("col1", IntegerType, false),
+ StructField("col2", StringType, true))))()),
+ child = child,
+ ioschema = hiveIOSchema
+ ),
+ df.select('c, 'd, 'e).collect())
}
- assert(e.getMessage.contains("Subprocess exited with status"))
- assert(uncaughtExceptionHandler.exception.isEmpty)
}
- test("SPARK-30973: TRANSFORM should wait for the termination of the script
(with serde)") {
+ test("SPARK-32106: TRANSFORM supports complex data types end to end (hive
serde) ") {
Review comment:
remove the space in the end.
##########
File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql
##########
@@ -0,0 +1,146 @@
+-- Test data.
+CREATE OR REPLACE TEMPORARY VIEW t AS SELECT * FROM VALUES
+('1', true, unhex('537061726B2053514C'), tinyint(1), 1, smallint(100),
bigint(1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'),
date('2000-04-01')),
+('2', false, unhex('537061726B2053514C'), tinyint(2), 2, smallint(200),
bigint(2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'),
date('2000-04-02')),
+('3', true, unhex('537061726B2053514C'), tinyint(3), 3, smallint(300),
bigint(3), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'),
date('2000-04-03'))
+AS t(a, b, c, d, e, f, g, h, i, j, k, l);
+
+SELECT TRANSFORM(a)
+USING 'cat' AS (a)
+FROM t;
+
+-- with non-exist command
+SELECT TRANSFORM(a)
+USING 'some_non_existent_command' AS (a)
+FROM t;
+
+-- with non-exist file
+SELECT TRANSFORM(a)
+USING 'python some_non_existent_file' AS (a)
+FROM t;
+
+-- common supported data types between no serde and serde transform
+SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM (
+ SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l)
+ USING 'cat' AS (
+ a string,
+ b boolean,
+ c binary,
+ d tinyint,
+ e int,
+ f smallint,
+ g long,
+ h float,
+ i double,
+ j decimal(38, 18),
+ k timestamp,
+ l date)
+ FROM t
+) tmp;
+
+-- common supported data types between no serde and serde transform
+SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM (
+ SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l)
+ USING 'cat' AS (
+ a string,
+ b string,
+ c string,
+ d string,
+ e string,
+ f string,
+ g string,
+ h string,
+ i string,
+ j string,
+ k string,
+ l string)
+ FROM t
+) tmp;
+
+-- SPARK-32388 handle schema less
+SELECT TRANSFORM(a)
+USING 'cat'
+FROM t;
+
+SELECT TRANSFORM(a, b)
+USING 'cat'
+FROM t;
+
+SELECT TRANSFORM(a, b, c)
+USING 'cat'
+FROM t;
+
+-- return null when return string incompatible (no serde)
+SELECT TRANSFORM(a, b, c, d, e, f, g, h, i)
+USING 'cat' AS (a int, b short, c long, d byte, e float, f double, g
decimal(38, 18), h date, i timestamp)
+FROM VALUES
+('a','','1231a','a','213.21a','213.21a','0a.21d','2000-04-01123','1997-0102
00:00:') tmp(a, b, c, d, e, f, g, h, i);
+
+-- SPARK-28227: transform can't run with aggregation
+SELECT TRANSFORM(b, max(a), sum(f))
+USING 'cat' AS (a, b)
+FROM t
+GROUP BY b;
+
+-- transform use MAP
+MAP a, b USING 'cat' AS (a, b) FROM t;
+
+-- transform use REDUCE
+REDUCE a, b USING 'cat' AS (a, b) FROM t;
+
+-- transform with defined row format delimit
+SELECT TRANSFORM(a, b, c, null)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '|'
+LINES TERMINATED BY '\n'
+NULL DEFINED AS 'NULL'
+USING 'cat' AS (a, b, c, d)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '|'
+LINES TERMINATED BY '\n'
+NULL DEFINED AS 'NULL'
+FROM t;
+
+
+SELECT TRANSFORM(a, b, c, null)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '|'
+LINES TERMINATED BY '\n'
+NULL DEFINED AS 'NULL'
+USING 'cat' AS (d)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '||'
+LINES TERMINATED BY '\n'
+NULL DEFINED AS 'NULL'
+FROM t;
+
+-- SPARK-31937 transform with defined row format delimit
Review comment:
This JIRA is related to this query? I read it though, I'm not sure about
the relationship. What kind of exceptions does this query throws?
##########
File path:
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala
##########
@@ -33,14 +32,13 @@ import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.io.Writable
import org.apache.spark.TaskContext
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.HiveInspectors
import org.apache.spark.sql.hive.HiveShim._
-import org.apache.spark.sql.types.DataType
-import org.apache.spark.util.{CircularBuffer, RedirectThread, Utils}
+import org.apache.spark.sql.types.{DataType, StringType}
Review comment:
`StringType` not used.
##########
File path:
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
##########
@@ -1063,6 +1063,9 @@ private[hive] trait HiveInspectors {
case DateType => dateTypeInfo
case TimestampType => timestampTypeInfo
case NullType => voidTypeInfo
+ case dt =>
+ throw new AnalysisException("TRANSFORM with hive serde does not
support " +
Review comment:
I think `HiveInspectors` is not related to `TRANSFORM`, so could you
make the error message more general?
##########
File path:
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
##########
@@ -1031,4 +1031,96 @@ class PlanParserSuite extends AnalysisTest {
assertEqual("select a, b from db.c;;;", table("db", "c").select('a, 'b))
assertEqual("select a, b from db.c; ;; ;", table("db", "c").select('a,
'b))
}
+
+ test("SPARK-32106: TRANSFORM without serde") {
Review comment:
Also, could you check `ROW FORMAT SERDE`, too?
##########
File path:
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
##########
@@ -1031,4 +1031,96 @@ class PlanParserSuite extends AnalysisTest {
assertEqual("select a, b from db.c;;;", table("db", "c").select('a, 'b))
assertEqual("select a, b from db.c; ;; ;", table("db", "c").select('a,
'b))
}
+
+ test("SPARK-32106: TRANSFORM without serde") {
Review comment:
`TRANSFORM without serde` -> `TRANSFORM plan`?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
##########
@@ -532,6 +532,21 @@ abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
}
}
+ object SparkScripts extends Strategy {
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case logical.ScriptTransformation(input, script, output, child, ioschema)
+ if ioschema.inputSerdeClass.isEmpty &&
ioschema.outputSerdeClass.isEmpty =>
Review comment:
We need to check this here? Seems like it has been checked in
https://github.com/apache/spark/pull/29085/files#diff-9847f5cef7cf7fbc5830fbc6b779ee10R783-R784
?
##########
File path: sql/core/src/test/resources/sql-tests/inputs/transform.sql
##########
@@ -0,0 +1,146 @@
+-- Test data.
+CREATE OR REPLACE TEMPORARY VIEW t AS SELECT * FROM VALUES
+('1', true, unhex('537061726B2053514C'), tinyint(1), 1, smallint(100),
bigint(1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'),
date('2000-04-01')),
+('2', false, unhex('537061726B2053514C'), tinyint(2), 2, smallint(200),
bigint(2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'),
date('2000-04-02')),
+('3', true, unhex('537061726B2053514C'), tinyint(3), 3, smallint(300),
bigint(3), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'),
date('2000-04-03'))
+AS t(a, b, c, d, e, f, g, h, i, j, k, l);
+
+SELECT TRANSFORM(a)
+USING 'cat' AS (a)
+FROM t;
+
+-- with non-exist command
+SELECT TRANSFORM(a)
+USING 'some_non_existent_command' AS (a)
+FROM t;
+
+-- with non-exist file
+SELECT TRANSFORM(a)
+USING 'python some_non_existent_file' AS (a)
+FROM t;
+
+-- common supported data types between no serde and serde transform
+SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM (
+ SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l)
+ USING 'cat' AS (
+ a string,
+ b boolean,
+ c binary,
+ d tinyint,
+ e int,
+ f smallint,
+ g long,
+ h float,
+ i double,
+ j decimal(38, 18),
+ k timestamp,
+ l date)
+ FROM t
+) tmp;
+
+-- common supported data types between no serde and serde transform
+SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM (
+ SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l)
+ USING 'cat' AS (
+ a string,
+ b string,
+ c string,
+ d string,
+ e string,
+ f string,
+ g string,
+ h string,
+ i string,
+ j string,
+ k string,
+ l string)
+ FROM t
+) tmp;
+
+-- SPARK-32388 handle schema less
+SELECT TRANSFORM(a)
+USING 'cat'
+FROM t;
+
+SELECT TRANSFORM(a, b)
+USING 'cat'
+FROM t;
+
+SELECT TRANSFORM(a, b, c)
+USING 'cat'
+FROM t;
+
+-- return null when return string incompatible (no serde)
+SELECT TRANSFORM(a, b, c, d, e, f, g, h, i)
+USING 'cat' AS (a int, b short, c long, d byte, e float, f double, g
decimal(38, 18), h date, i timestamp)
+FROM VALUES
+('a','','1231a','a','213.21a','213.21a','0a.21d','2000-04-01123','1997-0102
00:00:') tmp(a, b, c, d, e, f, g, h, i);
+
+-- SPARK-28227: transform can't run with aggregation
+SELECT TRANSFORM(b, max(a), sum(f))
+USING 'cat' AS (a, b)
+FROM t
+GROUP BY b;
+
+-- transform use MAP
+MAP a, b USING 'cat' AS (a, b) FROM t;
+
+-- transform use REDUCE
+REDUCE a, b USING 'cat' AS (a, b) FROM t;
+
+-- transform with defined row format delimit
+SELECT TRANSFORM(a, b, c, null)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '|'
+LINES TERMINATED BY '\n'
+NULL DEFINED AS 'NULL'
+USING 'cat' AS (a, b, c, d)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '|'
+LINES TERMINATED BY '\n'
+NULL DEFINED AS 'NULL'
+FROM t;
+
+
Review comment:
remove blank.
##########
File path:
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala
##########
@@ -54,87 +52,110 @@ case class HiveScriptTransformationExec(
script: String,
output: Seq[Attribute],
child: SparkPlan,
- ioschema: HiveScriptIOSchema)
- extends BaseScriptTransformationExec {
+ ioschema: ScriptTransformationIOSchema)
+ extends BaseScriptTransformationExec with HiveInspectors {
- override def processIterator(
- inputIterator: Iterator[InternalRow],
- hadoopConf: Configuration): Iterator[InternalRow] = {
- val cmd = List("/bin/bash", "-c", script)
- val builder = new ProcessBuilder(cmd.asJava)
-
- val proc = builder.start()
- val inputStream = proc.getInputStream
- val outputStream = proc.getOutputStream
- val errorStream = proc.getErrorStream
-
- // In order to avoid deadlocks, we need to consume the error output of the
child process.
- // To avoid issues caused by large error output, we use a circular buffer
to limit the amount
- // of error output that we retain. See SPARK-7862 for more discussion of
the deadlock / hang
- // that motivates this.
- val stderrBuffer = new CircularBuffer(2048)
- new RedirectThread(
- errorStream,
- stderrBuffer,
- "Thread-ScriptTransformation-STDERR-Consumer").start()
+ private def initInputSerDe(
+ input: Seq[Expression]): Option[(AbstractSerDe, StructObjectInspector)]
= {
+ ioschema.inputSerdeClass.map { serdeClass =>
+ val (columns, columnTypes) = parseAttrs(input)
+ val serde = initSerDe(serdeClass, columns, columnTypes,
ioschema.inputSerdeProps)
+ val fieldObjectInspectors = columnTypes.map(toInspector)
+ val objectInspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(columns.asJava,
fieldObjectInspectors.asJava)
+ (serde, objectInspector)
+ }
+ }
- val outputProjection = new InterpretedProjection(input, child.output)
+ private def initOutputSerDe(
+ output: Seq[Attribute]): Option[(AbstractSerDe, StructObjectInspector)]
= {
+ ioschema.outputSerdeClass.map { serdeClass =>
+ val (columns, columnTypes) = parseAttrs(output)
+ val serde = initSerDe(serdeClass, columns, columnTypes,
ioschema.outputSerdeProps)
+ val structObjectInspector =
serde.getObjectInspector().asInstanceOf[StructObjectInspector]
+ (serde, structObjectInspector)
+ }
+ }
- // This nullability is a performance optimization in order to avoid an
Option.foreach() call
- // inside of a loop
- @Nullable val (inputSerde, inputSoi) =
ioschema.initInputSerDe(input).getOrElse((null, null))
+ private def parseAttrs(attrs: Seq[Expression]): (Seq[String], Seq[DataType])
= {
+ val columns = attrs.zipWithIndex.map(e => s"${e._1.prettyName}_${e._2}")
+ val columnTypes = attrs.map(_.dataType)
+ (columns, columnTypes)
+ }
- // This new thread will consume the ScriptTransformation's input rows and
write them to the
- // external process. That process's output will be read by this current
thread.
- val writerThread = new HiveScriptTransformationWriterThread(
- inputIterator.map(outputProjection),
- input.map(_.dataType),
- inputSerde,
- inputSoi,
- ioschema,
- outputStream,
- proc,
- stderrBuffer,
- TaskContext.get(),
- hadoopConf
- )
+ private def initSerDe(
Review comment:
Sorry for the confusion, but, on second thought, its better to pull out
hive-serde related functions from `HiveScriptTransformationExec` then create a
companion object having them for readability
https://github.com/maropu/spark/commit/972775b821406d81d3c1ba1c718de3037a0ca068.
WDTY?
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.{SparkException, TestUtils}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SparkScriptTransformationSuite extends BaseScriptTransformationSuite
with SharedSparkSession {
+ import testImplicits._
+
+ override def isHive23OrSpark: Boolean = true
+
+ override def createScriptTransformationExec(
+ input: Seq[Expression],
+ script: String,
+ output: Seq[Attribute],
+ child: SparkPlan,
+ ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec = {
+ SparkScriptTransformationExec(
+ input = input,
+ script = script,
+ output = output,
+ child = child,
+ ioschema = ioschema
+ )
+ }
+
+ test("SPARK-32106: TRANSFORM with serde without hive should throw
exception") {
+ assume(TestUtils.testCommandAvailable("/bin/bash"))
+ withTempView("v") {
+ val df = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
+ df.createTempView("v")
+
+ val e = intercept[ParseException] {
+ sql(
+ """
+ |SELECT TRANSFORM (a)
+ |ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+ |USING 'cat' AS (a)
+ |ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+ |FROM v
+ """.stripMargin)
+ }.getMessage
+ assert(e.contains("TRANSFORM with serde is only supported in hive mode"))
+ }
+ }
+
+ test("TRANSFORM doesn't support ArrayType/MapType/StructType as output data
type (no serde)") {
Review comment:
`SPARK-32106:`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]