tomvanbussel commented on code in PR #38979:
URL: https://github.com/apache/spark/pull/38979#discussion_r1044271450


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -380,7 +396,26 @@ class SparkConnectPlanner(session: SparkSession) {
     }
     val attributes = structType.toAttributes
     val proj = UnsafeProjection.create(attributes, attributes)
-    new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+    val relation = logical.LocalRelation(attributes, rows.map(r => 
proj(r).copy()).toSeq)
+
+    if (rel.hasDatatype || rel.hasDatatypeStr) {
+      // rename columns and update datatypes
+      val schema = if (rel.hasDatatype) {
+        DataTypeProtoConverter
+          .toCatalystType(rel.getDatatype)
+          .asInstanceOf[StructType]
+      } else {
+        parseDatatypeString(rel.getDatatypeStr)
+          .asInstanceOf[StructType]
+      }
+      Dataset
+        .ofRows(session, logicalPlan = relation)
+        .toDF(schema.names: _*)

Review Comment:
   This throws away the names of nested fields. Perhaps we can do something 
like the following instead:
   ```scala
       val (rows, inferredSchema) = ArrowConverters.fromBatchWithSchemaIterator(
         Iterator(rel.getData.toByteArray),
         TaskContext.get())
       if (inferredSchema == null) {
         throw InvalidPlanInput(s"Input data for LocalRelation does not produce 
a schema.")
       }
   
       val schemaType = if (rel.hasDataType) {
         DataTypeProtoConverter.toCatalystType(rel.getDataType)
       } else if (rel.hasDataTypeString) {
         parseDatatypeString(rel.getDataTypeString)
       } else {
         inferredSchema
       }
   
       val schemaStruct = schemaType match {
         case s: StructType => s
         case d => StructType(Seq(StructField("value", d)))
       }
   
       val attributes = schemaStruct.toAttributes
       val proj = UnsafeProjection.create(attributes, attributes)
       new logical.LocalRelation(attributes, rows.map(r => 
proj(r).copy()).toSeq)
   ```



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -380,7 +396,26 @@ class SparkConnectPlanner(session: SparkSession) {
     }
     val attributes = structType.toAttributes
     val proj = UnsafeProjection.create(attributes, attributes)
-    new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+    val relation = logical.LocalRelation(attributes, rows.map(r => 
proj(r).copy()).toSeq)
+
+    if (rel.hasDatatype || rel.hasDatatypeStr) {
+      // rename columns and update datatypes
+      val schema = if (rel.hasDatatype) {
+        DataTypeProtoConverter
+          .toCatalystType(rel.getDatatype)
+          .asInstanceOf[StructType]

Review Comment:
   This is not guaranteed to be a `StructType`. `createDataFrame` also allows 
`AtomicType` to be used, and in that case `"value"` will be used as the column 
name.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -371,6 +371,22 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private def parseDatatypeString(sqlText: String): DataType = {
+    var dataType: DataType = null
+    try {
+      dataType = session.sessionState.sqlParser.parseTableSchema(sqlText)
+    } catch {
+      case e1: ParseException =>
+        try {
+          dataType = session.sessionState.sqlParser.parseDataType(sqlText)
+        } catch {
+          case e2: ParseException =>
+            dataType = 
session.sessionState.sqlParser.parseDataType(s"struct<${sqlText.strip}>")
+        }
+    }
+    dataType

Review Comment:
   Nit: This can be simplified.
   ```scala
       try {
         session.sessionState.sqlParser.parseTableSchema(sqlText)
       } catch {
         case _: ParseException =>
           try {
             session.sessionState.sqlParser.parseDataType(sqlText)
           } catch {
             case _: ParseException =>
               
session.sessionState.sqlParser.parseDataType(s"struct<${sqlText.strip}>")
           }
       }
   ```



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -380,7 +396,26 @@ class SparkConnectPlanner(session: SparkSession) {
     }
     val attributes = structType.toAttributes
     val proj = UnsafeProjection.create(attributes, attributes)
-    new logical.LocalRelation(attributes, rows.map(r => proj(r).copy()).toSeq)
+    val relation = logical.LocalRelation(attributes, rows.map(r => 
proj(r).copy()).toSeq)
+
+    if (rel.hasDatatype || rel.hasDatatypeStr) {
+      // rename columns and update datatypes
+      val schema = if (rel.hasDatatype) {
+        DataTypeProtoConverter
+          .toCatalystType(rel.getDatatype)
+          .asInstanceOf[StructType]
+      } else {
+        parseDatatypeString(rel.getDatatypeStr)
+          .asInstanceOf[StructType]

Review Comment:
   Same issue here, this can be any `DataType`, not just a `StructType`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to