grundprinzip commented on code in PR #38659:
URL: https://github.com/apache/spark/pull/38659#discussion_r1027046810


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -271,8 +273,12 @@ class SparkConnectPlanner(session: SparkSession) {
   }
 
   private def transformLocalRelation(rel: proto.LocalRelation): LogicalPlan = {
-    val attributes = 
rel.getAttributesList.asScala.map(transformAttribute(_)).toSeq
-    new org.apache.spark.sql.catalyst.plans.logical.LocalRelation(attributes)
+    val (rows, structType) = ArrowConverters.fromBatchWithSchemaIterator(

Review Comment:
   This looks very good. 



##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala:
##########
@@ -354,4 +365,16 @@ class SparkConnectPlannerSuite extends SparkFunSuite with 
SparkConnectPlanTest {
       transform(proto.Relation.newBuilder.setSetOp(intersect).build()))
     assert(e2.getMessage.contains("Intersect does not support union_by_name"))
   }
+
+  test("transform LocalRelation") {
+    val inputRows = (0 until 10).map(InternalRow(_))

Review Comment:
   The test here is kind of bare bones. Before we fully approve the PR we need 
to extend the test coverage a bit. 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala:
##########
@@ -76,21 +72,26 @@ private[sql] object ArrowConverters extends Logging {
       schema: StructType,
       maxRecordsPerBatch: Long,
       timeZoneId: String,
-      context: TaskContext) extends Iterator[Array[Byte]] {
+      context: TaskContext)
+      extends Iterator[Array[Byte]] {

Review Comment:
   why these changes?



##########
connector/connect/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -271,9 +271,7 @@ message Deduplicate {
 
 // A relation that does not need to be qualified by name.
 message LocalRelation {
-  // (Optional) A list qualified attributes.
-  repeated Expression.QualifiedAttribute attributes = 1;
-  // TODO: support local data.
+  bytes data = 1;

Review Comment:
   Please add a comment mentioning that the data is stored as arrow IPC message 
streams and that since the ipc streams contain the schema we don't need to 
qualify it. 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala:
##########
@@ -37,10 +34,9 @@ import 
org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.ArrowUtils
-import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, 
ColumnVector}
+import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnVector, 
ColumnarBatch}

Review Comment:
   afaik this change should break scala Style as CB is before CV



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to