sryza commented on code in PR #51208:
URL: https://github.com/apache/spark/pull/51208#discussion_r2159723113


##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala:
##########
@@ -55,6 +55,75 @@ trait GraphValidations extends Logging {
     multiQueryTables
   }
 
+  protected[graph] def validateFlowStreamingness(): Unit = {
+    flowsTo.foreach { case (destTableIdentifier, flows) =>
+      val destTableOpt = table.get(destTableIdentifier)
+
+      // If the destination identifier does not correspond to a table, it must 
be a view.
+      val destViewOpt = destTableOpt.fold(view.get(destTableIdentifier))(_ => 
None)

Review Comment:
   Why not just use `view.get(destTableIdentifier)` here?



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala:
##########
@@ -55,6 +55,75 @@ trait GraphValidations extends Logging {
     multiQueryTables
   }
 
+  protected[graph] def validateFlowStreamingness(): Unit = {

Review Comment:
   This could use some docstring. It doesn't need to go into the individual 
cases (because that could get out of sync easily), but could be at the level of 
"Some dataset types require that the flows that target them are backed by 
streaming queries. Others require batch queries. This validates that the 
dataset types match whether the flow query is streaming or batch".



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -2719,6 +2719,33 @@
     ],
     "sqlState" : "42000"
   },
+  "INVALID_FLOW_RELATION_TYPE" : {

Review Comment:
   Any particular reason to use "relation" instead of "query"? I think the 
latter is more common in docs.



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala:
##########
@@ -55,6 +55,75 @@ trait GraphValidations extends Logging {
     multiQueryTables
   }
 
+  protected[graph] def validateFlowStreamingness(): Unit = {
+    flowsTo.foreach { case (destTableIdentifier, flows) =>
+      val destTableOpt = table.get(destTableIdentifier)
+
+      // If the destination identifier does not correspond to a table, it must 
be a view.
+      val destViewOpt = destTableOpt.fold(view.get(destTableIdentifier))(_ => 
None)
+
+      flows.foreach {

Review Comment:
   Nitpick you can feel free to ignore: this is pretty heavily nested. Could 
remove a level of nesting with:
   
   ```
   val resolvedFlows: Seq[ResolvedFlow] = flows.collect { case rf: ResolvedFlow 
=> rf }
   ```
   



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -2719,6 +2719,33 @@
     ],
     "sqlState" : "42000"
   },
+  "INVALID_FLOW_RELATION_TYPE" : {
+    "message" : [
+      "Flow <flowIdentifier> returns an invalid relation type."
+    ],
+    "subClass" : {
+      "FOR_MATERIALIZED_VIEW" : {

Review Comment:
   Thoughts on being more descriptive with these like 
`STREAMING_RELATION_FOR_MATERIALIZED_VIEW` / 
`BATCH_RELATION_FOR_STREAMING_TABLE`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to