Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/22626#discussion_r230544717
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
---
@@ -174,3 +176,66 @@ case class SchemaOfCsv(
override def prettyName: String = "schema_of_csv"
}
+
+/**
+ * Converts a [[StructType]] to a CSV output string.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+ usage = "_FUNC_(expr[, options]) - Returns a CSV string with a given
struct value",
+ examples = """
+ Examples:
+ > SELECT _FUNC_(named_struct('a', 1, 'b', 2));
+ 1,2
+ > SELECT _FUNC_(named_struct('time', to_timestamp('2015-08-26',
'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy'));
+ "26/08/2015"
+ """,
+ since = "3.0.0")
+// scalastyle:on line.size.limit
+case class StructsToCsv(
+ options: Map[String, String],
+ child: Expression,
+ timeZoneId: Option[String] = None)
+ extends UnaryExpression with TimeZoneAwareExpression with
CodegenFallback with ExpectsInputTypes {
+ override def nullable: Boolean = true
+
+ def this(options: Map[String, String], child: Expression) =
this(options, child, None)
+
+ // Used in `FunctionRegistry`
+ def this(child: Expression) = this(Map.empty, child, None)
+
+ def this(child: Expression, options: Expression) =
+ this(
+ options = ExprUtils.convertToMapData(options),
+ child = child,
+ timeZoneId = None)
+
+ @transient
+ lazy val writer = new CharArrayWriter()
+
+ @transient
+ lazy val inputSchema: StructType = child.dataType match {
+ case st: StructType => st
+ case other =>
+ throw new IllegalArgumentException(s"Unsupported input type
${other.catalogString}")
+ }
+
+ @transient
+ lazy val gen = new UnivocityGenerator(
+ inputSchema, writer, new CSVOptions(options, columnPruning = true,
timeZoneId.get))
+
+ // This converts rows to the CSV output according to the given schema.
+ @transient
+ lazy val converter: Any => UTF8String = {
+ (row: Any) =>
UTF8String.fromString(gen.writeToString(row.asInstanceOf[InternalRow]))
+ }
+
+ override def dataType: DataType = StringType
+
+ override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+ copy(timeZoneId = Option(timeZoneId))
+
+ override def nullSafeEval(value: Any): Any = converter(value)
+
+ override def inputTypes: Seq[AbstractDataType] =
TypeCollection(StructType) :: Nil
--- End diff --
I think we can `StructType :: Nil`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]