tomvanbussel commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1044271450
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -380,7 +396,26 @@ class SparkConnectPlanner(session: SparkSession) {
}
val attributes = structType.toAttributes
val proj = UnsafeProjection.create(attributes, attributes)
- new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+ val relation = logical.LocalRelation(attributes, rows.map(r =>
proj(r).copy()).toSeq)
+
+ if (rel.hasDatatype || rel.hasDatatypeStr) {
+ // rename columns and update datatypes
+ val schema = if (rel.hasDatatype) {
+ DataTypeProtoConverter
+ .toCatalystType(rel.getDatatype)
+ .asInstanceOf[StructType]
+ } else {
+ parseDatatypeString(rel.getDatatypeStr)
+ .asInstanceOf[StructType]
+ }
+ Dataset
+ .ofRows(session, logicalPlan = relation)
+ .toDF(schema.names: _*)
Review Comment:
This throws away the names of nested fields. Perhaps we can do something
like the following instead:
```scala
val (rows, inferredSchema) = ArrowConverters.fromBatchWithSchemaIterator(
Iterator(rel.getData.toByteArray),
TaskContext.get())
if (inferredSchema == null) {
throw InvalidPlanInput(s"Input data for LocalRelation does not produce
a schema.")
}
val schemaType = if (rel.hasDataType) {
DataTypeProtoConverter.toCatalystType(rel.getDataType)
} else if (rel.hasDataTypeString) {
parseDatatypeString(rel.getDataTypeString)
} else {
inferredSchema
}
val schemaStruct = schemaType match {
case s: StructType => s
case d => StructType(Seq(StructField("value", d)))
}
val attributes = schemaStruct.toAttributes
val proj = UnsafeProjection.create(attributes, attributes)
new logical.LocalRelation(attributes, rows.map(r =>
proj(r).copy()).toSeq)
```
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -380,7 +396,26 @@ class SparkConnectPlanner(session: SparkSession) {
}
val attributes = structType.toAttributes
val proj = UnsafeProjection.create(attributes, attributes)
- new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+ val relation = logical.LocalRelation(attributes, rows.map(r =>
proj(r).copy()).toSeq)
+
+ if (rel.hasDatatype || rel.hasDatatypeStr) {
+ // rename columns and update datatypes
+ val schema = if (rel.hasDatatype) {
+ DataTypeProtoConverter
+ .toCatalystType(rel.getDatatype)
+ .asInstanceOf[StructType]
Review Comment:
This is not guaranteed to be a `StructType`. `createDataFrame` also allows
`AtomicType` to be used, and in that case `"value"` will be used as the column
name.
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -371,6 +371,22 @@ class SparkConnectPlanner(session: SparkSession) {
}
}
+ private def parseDatatypeString(sqlText: String): DataType = {
+ var dataType: DataType = null
+ try {
+ dataType = session.sessionState.sqlParser.parseTableSchema(sqlText)
+ } catch {
+ case e1: ParseException =>
+ try {
+ dataType = session.sessionState.sqlParser.parseDataType(sqlText)
+ } catch {
+ case e2: ParseException =>
+ dataType =
session.sessionState.sqlParser.parseDataType(s"struct<${sqlText.strip}>")
+ }
+ }
+ dataType
Review Comment:
Nit: This can be simplified.
```scala
try {
session.sessionState.sqlParser.parseTableSchema(sqlText)
} catch {
case _: ParseException =>
try {
session.sessionState.sqlParser.parseDataType(sqlText)
} catch {
case _: ParseException =>
session.sessionState.sqlParser.parseDataType(s"struct<${sqlText.strip}>")
}
}
```
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -380,7 +396,26 @@ class SparkConnectPlanner(session: SparkSession) {
}
val attributes = structType.toAttributes
val proj = UnsafeProjection.create(attributes, attributes)
- new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+ val relation = logical.LocalRelation(attributes, rows.map(r =>
proj(r).copy()).toSeq)
+
+ if (rel.hasDatatype || rel.hasDatatypeStr) {
+ // rename columns and update datatypes
+ val schema = if (rel.hasDatatype) {
+ DataTypeProtoConverter
+ .toCatalystType(rel.getDatatype)
+ .asInstanceOf[StructType]
+ } else {
+ parseDatatypeString(rel.getDatatypeStr)
+ .asInstanceOf[StructType]
Review Comment:
Same issue here, this can be any `DataType`, not just a `StructType`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]