sryza commented on code in PR #52538:
URL: https://github.com/apache/spark/pull/52538#discussion_r2442499320


##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -5015,7 +5015,7 @@
   "RUN_EMPTY_PIPELINE" : {
     "message" : [
       "Pipelines are expected to have at least one non-temporary dataset 
defined (tables, persisted views) but no non-temporary datasets were found in 
your pipeline.",
-      "Please verify that you have included the expected source files, and 
that your source code includes table definitions (e.g., CREATE MATERIALIZED 
VIEW in SQL code, @sdp.table in python code)."
+      "Please verify that you have included the expected source files, and 
that your source code includes table definitions (e.g., CREATE MATERIALIZED 
VIEW in SQL code, @dp.table in python code)."

Review Comment:
   👍 



##########
python/pyspark/pipelines/cli.py:
##########
@@ -321,12 +323,11 @@ def run(
         default_database=spec.database,
         sql_conf=spec.configuration,
     )
+    log_with_curr_timestamp(f"Dataflow graph created (ID: 
{dataflow_graph_id}).")

Review Comment:
   I'm worried that output is already a little verbose, so would lean towards 
excluding this unless there's a strong reason



##########
python/pyspark/pipelines/cli.py:
##########
@@ -321,12 +323,11 @@ def run(
         default_database=spec.database,
         sql_conf=spec.configuration,
     )
+    log_with_curr_timestamp(f"Dataflow graph created (ID: 
{dataflow_graph_id}).")
 
     log_with_curr_timestamp("Registering graph elements...")
     registry = SparkConnectGraphElementRegistry(spark, dataflow_graph_id)
     register_definitions(spec_path, registry, spec)
-
-    log_with_curr_timestamp("Starting run...")

Review Comment:
   Why take this out?



##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala:
##########
@@ -329,7 +332,7 @@ private[connect] object PipelinesHandler extends Logging {
             flow.getSourceCodeLocation.getLineNumber),
           objectType = Option(QueryOriginType.Flow.toString),
           objectName = Option(flowIdentifier.unquotedString),
-          language = Option(Python()))))
+          language = Some(Python()))))

Review Comment:
   Why switch to `Some`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala:
##########
@@ -1376,7 +1376,7 @@ class SparkSqlAstBuilder extends AstBuilder {
 
     if (colConstraints.nonEmpty) {
       throw operationNotAllowed("Pipeline datasets do not currently support 
column constraints. " +
-        "Please remove and CHECK, UNIQUE, PK, and FK constraints specified on 
the pipeline " +
+        "Please remove any CHECK, UNIQUE, PK, and FK constraints specified on 
the pipeline " +

Review Comment:
   👍 



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala:
##########
@@ -53,7 +52,7 @@ trait FlowExecution {
   def identifier: TableIdentifier
 
   /**
-   * Returns a user-visible name for the flow.
+   * Returns a user-visible name of this flow.

Review Comment:
   ```suggestion
      * Returns the user-visible name of this flow.
   ```



##########
python/pyspark/pipelines/cli.py:
##########
@@ -386,7 +390,7 @@ def parse_table_list(value: str) -> List[str]:
     # "init" subcommand
     init_parser = subparsers.add_parser(
         "init",
-        help="Generates a simple pipeline project, including a spec file and 
example definitions.",
+        help="Generate a sample pipeline project, with a spec file and example 
transformations.",

Review Comment:
   👍 



##########
python/pyspark/pipelines/cli.py:
##########
@@ -347,8 +348,9 @@ def parse_table_list(value: str) -> List[str]:
     return [table.strip() for table in value.split(",") if table.strip()]
 
 
-if __name__ == "__main__":

Review Comment:
   👍 



##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala:
##########
@@ -135,15 +133,21 @@ private[connect] object PipelinesHandler extends Logging {
     val defaultCatalog = Option
       .when(cmd.hasDefaultCatalog)(cmd.getDefaultCatalog)
       .getOrElse {
-        logInfo(s"No default catalog was supplied. Falling back to the current 
catalog.")
-        sessionHolder.session.catalog.currentCatalog()
+        val currentCatalog = sessionHolder.session.catalog.currentCatalog()

Review Comment:
   👍 



##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala:
##########
@@ -285,22 +289,21 @@ private[connect] object PipelinesHandler extends Logging {
 
     val rawDestinationIdentifier = GraphIdentifierManager
       .parseTableIdentifier(name = flow.getTargetDatasetName, spark = 
sessionHolder.session)
-    val flowWritesToView =
-      graphElementRegistry
-        .getViews()
+    val isFlowWriteToView =

Review Comment:
   Not sure this is an improvement – not exactly proper grammar



##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala:
##########
@@ -135,15 +133,21 @@ private[connect] object PipelinesHandler extends Logging {
     val defaultCatalog = Option
       .when(cmd.hasDefaultCatalog)(cmd.getDefaultCatalog)
       .getOrElse {
-        logInfo(s"No default catalog was supplied. Falling back to the current 
catalog.")
-        sessionHolder.session.catalog.currentCatalog()
+        val currentCatalog = sessionHolder.session.catalog.currentCatalog()
+        logInfo(
+          "No default catalog was supplied. " +
+            s"Falling back to the current catalog: $currentCatalog.")
+        currentCatalog
       }
 
     val defaultDatabase = Option
       .when(cmd.hasDefaultDatabase)(cmd.getDefaultDatabase)
       .getOrElse {
-        logInfo(s"No default database was supplied. Falling back to the 
current database.")
-        sessionHolder.session.catalog.currentDatabase
+        val currentDatabase = sessionHolder.session.catalog.currentDatabase

Review Comment:
   👍 



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala:
##########
@@ -149,8 +149,10 @@ trait ResolutionCompletedFlow extends Flow {
 }
 
 /** A [[Flow]] whose flow function has failed to resolve. */
-class ResolutionFailedFlow(val flow: UnresolvedFlow, val funcResult: 
FlowFunctionResult)
-    extends ResolutionCompletedFlow {
+class ResolutionFailedFlow(

Review Comment:
   Not a big deal either way, but is splitting this across multiple lines 
better?



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala:
##########
@@ -198,22 +197,20 @@ private class FlowResolver(rawGraph: DataflowGraph) {
               )
           }
       }
-    result
   }
 
   private def convertResolvedToTypedFlow(
       flow: UnresolvedFlow,
       funcResult: FlowFunctionResult): ResolvedFlow = {
-    val typedFlow = flow match {
-      case f: UnresolvedFlow if f.once => new AppendOnceFlow(flow, funcResult)
-      case f: UnresolvedFlow if funcResult.dataFrame.get.isStreaming =>
+    flow match {
+      case _ if flow.once => new AppendOnceFlow(flow, funcResult)

Review Comment:
   Are we confident that not checking for `UnresolvedFlow` doesn't change 
behavior?



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala:
##########
@@ -252,18 +246,16 @@ class BatchTableWrite(
     val sqlConf: Map[String, String]
 ) extends FlowExecution {
 
-  override def isStreaming: Boolean = false
+  override final def isStreaming: Boolean = false
   override def getOrigin: QueryOrigin = flow.origin
 
-  def executeInternal(): scala.concurrent.Future[Unit] =
+  def executeInternal(): Future[Unit] =
     SparkSessionUtils.withSqlConf(spark, sqlConf.toList: _*) {
       updateContext.flowProgressEventLogger.recordRunning(flow = flow)
       val data = graph.reanalyzeFlow(flow).df
       Future {
         val dataFrameWriter = data.write
-        if (destination.format.isDefined) {
-          dataFrameWriter.format(destination.format.get)
-        }
+        destination.format.foreach(dataFrameWriter.format)

Review Comment:
   [x]



##########
python/pyspark/pipelines/spark_connect_pipeline.py:
##########
@@ -29,7 +29,7 @@ def create_dataflow_graph(
     default_database: Optional[str],
     sql_conf: Optional[Mapping[str, str]],
 ) -> str:
-    """Create a dataflow graph in in the Spark Connect server.
+    """Create a dataflow graph in the Spark Connect server.

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to