grundprinzip commented on code in PR #38086:
URL: https://github.com/apache/spark/pull/38086#discussion_r996480368


##########
connector/connect/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -65,11 +65,21 @@ message SQL {
 message Read {
   oneof read_type {
     NamedTable named_table = 1;
+    DataSource data_source = 2;
   }
 
   message NamedTable {
     repeated string parts = 1;
   }
+
+  message DataSource {
+    // Required. Supported formats include: parquet, orc, text, json, parquet, 
csv, avro.

Review Comment:
   ```suggestion
       // Required. Supported formats may include: parquet, orc, text, json, 
parquet, csv, avro.
   ```
   
   The reason is that the resolution of the data source is happening on the 
server side and depends on which DS classes are available in the classpath.



##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -91,7 +92,25 @@ class SparkConnectPlanner(plan: proto.Relation, session: 
SparkSession) {
         } else {
           child
         }
-      case _ => throw InvalidPlanInput()
+      case proto.Read.ReadTypeCase.DATA_SOURCE =>
+        if (rel.getDataSource.getFormat == "") {
+          throw InvalidPlanInput("DataSource requires a format")
+        }
+        val localMap = 
CaseInsensitiveMap[String](rel.getDataSource.getOptionsMap.asScala.toMap)
+        if (!localMap.contains("path")) {
+          throw InvalidPlanInput("DataSource requires a path")
+        }
+
+        val reader = session.read
+        reader.format(rel.getDataSource.getFormat)
+        for ((k, v) <- localMap) {
+          reader.option(k, v)
+        }
+        if (rel.getDataSource.getSchema != null) {
+          reader.schema(rel.getDataSource.getSchema)
+        }
+        reader.load().queryExecution.analyzed

Review Comment:
   So my question here is, if the execution has already happened or not. What I 
mean is, is `load()` a blocking operation or a logical one? If you call 
`load()` and then return the analyzed plan. What happens if you call collect on 
this plan? Does the load happen again?
   
   Can verify this?
   
   



##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -91,7 +92,25 @@ class SparkConnectPlanner(plan: proto.Relation, session: 
SparkSession) {
         } else {
           child
         }
-      case _ => throw InvalidPlanInput()
+      case proto.Read.ReadTypeCase.DATA_SOURCE =>
+        if (rel.getDataSource.getFormat == "") {
+          throw InvalidPlanInput("DataSource requires a format")
+        }
+        val localMap = 
CaseInsensitiveMap[String](rel.getDataSource.getOptionsMap.asScala.toMap)
+        if (!localMap.contains("path")) {
+          throw InvalidPlanInput("DataSource requires a path")
+        }
+
+        val reader = session.read
+        reader.format(rel.getDataSource.getFormat)
+        for ((k, v) <- localMap) {
+          reader.option(k, v)
+        }

Review Comment:
   ```suggestion
           localMap.foreach { case (key, value) => reader.option(key, value) }
   ```
   
   Not sure what is preferred?



##########
python/pyspark/sql/connect/plan.py:
##########
@@ -101,6 +102,30 @@ def _child_repr_(self) -> str:
         return self._child._repr_html_() if self._child is not None else ""
 
 
+class DataSource(LogicalPlan):

Review Comment:
   Couple of nits:
   
   - doc?
   - repr / repr_html?
   
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to