AnishMahto commented on code in PR #51507:
URL: https://github.com/apache/spark/pull/51507#discussion_r2211483342
##########
python/pyspark/pipelines/cli.py:
##########
@@ -283,6 +341,11 @@ def run(spec_path: Path) -> None:
else:
spec_path = find_pipeline_spec(Path.cwd())
- run(spec_path=spec_path)
+ run(
+ spec_path=spec_path,
+ full_refresh=flatten_table_lists(args.full_refresh),
Review Comment:
Why do we need to flatten `args.full_refresh` and `args.refresh`? I thought
we defined their types with the `parse_table_list` function, which returns
`List[str]`
##########
python/pyspark/pipelines/cli.py:
##########
@@ -242,20 +264,56 @@ def run(spec_path: Path) -> None:
register_definitions(spec_path, registry, spec)
log_with_curr_timestamp("Starting run...")
- result_iter = start_run(spark, dataflow_graph_id)
+ result_iter = start_run(
+ spark,
+ dataflow_graph_id,
+ full_refresh=full_refresh,
+ full_refresh_all=full_refresh_all,
+ refresh=refresh,
+ )
try:
handle_pipeline_events(result_iter)
finally:
spark.stop()
+def parse_table_list(value: str) -> List[str]:
+ """Parse a comma-separated list of table names, handling whitespace."""
+ return [table.strip() for table in value.split(",") if table.strip()]
+
+
+def flatten_table_lists(table_lists: Optional[List[List[str]]]) ->
Optional[List[str]]:
+ """Flatten a list of lists of table names into a single list."""
+ if not table_lists:
+ return None
+ result = []
+ for table_list in table_lists:
+ result.extend(table_list)
+ return result if result else None
+
+
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipeline CLI")
subparsers = parser.add_subparsers(dest="command", required=True)
# "run" subcommand
run_parser = subparsers.add_parser("run", help="Run a pipeline.")
run_parser.add_argument("--spec", help="Path to the pipeline spec.")
+ run_parser.add_argument(
+ "--full-refresh",
+ type=parse_table_list,
+ action="append",
+ help="List of datasets to reset and recompute (comma-separated).",
Review Comment:
Here and below, should we document default behavior if this arg is not
specified at all?
##########
python/pyspark/pipelines/cli.py:
##########
@@ -242,20 +264,56 @@ def run(spec_path: Path) -> None:
register_definitions(spec_path, registry, spec)
log_with_curr_timestamp("Starting run...")
- result_iter = start_run(spark, dataflow_graph_id)
+ result_iter = start_run(
+ spark,
+ dataflow_graph_id,
+ full_refresh=full_refresh,
+ full_refresh_all=full_refresh_all,
+ refresh=refresh,
+ )
try:
handle_pipeline_events(result_iter)
finally:
spark.stop()
+def parse_table_list(value: str) -> List[str]:
+ """Parse a comma-separated list of table names, handling whitespace."""
+ return [table.strip() for table in value.split(",") if table.strip()]
+
+
+def flatten_table_lists(table_lists: Optional[List[List[str]]]) ->
Optional[List[str]]:
+ """Flatten a list of lists of table names into a single list."""
+ if not table_lists:
+ return None
+ result = []
+ for table_list in table_lists:
+ result.extend(table_list)
+ return result if result else None
Review Comment:
If result is an empty list, do we still want to return None? Or should we
just return the empty list? What is the implication of either here
##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala:
##########
@@ -224,6 +225,64 @@ private[connect] object PipelinesHandler extends Logging {
sessionHolder: SessionHolder): Unit = {
val dataflowGraphId = cmd.getDataflowGraphId
val graphElementRegistry =
DataflowGraphRegistry.getDataflowGraphOrThrow(dataflowGraphId)
+
Review Comment:
Can we extract all this added logic to deduce the full refresh and regular
refresh table filters into its own function? And then as part of the scala
docs, map the expected filter results depending on what combination of full
refresh and partial refresh is selected
##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala:
##########
@@ -224,6 +225,64 @@ private[connect] object PipelinesHandler extends Logging {
sessionHolder: SessionHolder): Unit = {
val dataflowGraphId = cmd.getDataflowGraphId
val graphElementRegistry =
DataflowGraphRegistry.getDataflowGraphOrThrow(dataflowGraphId)
+
+ // Extract refresh parameters from protobuf command
+ val fullRefreshTables = cmd.getFullRefreshList.asScala.toSeq
+ val fullRefreshAll = cmd.getFullRefreshAll
+ val refreshTables = cmd.getRefreshList.asScala.toSeq
+
+ // Convert table names to fully qualified TableIdentifier objects
+ def parseTableNames(tableNames: Seq[String]): Set[TableIdentifier] = {
+ tableNames.map { name =>
+ GraphIdentifierManager
+ .parseAndQualifyTableIdentifier(
+ rawTableIdentifier =
+ GraphIdentifierManager.parseTableIdentifier(name,
sessionHolder.session),
+ currentCatalog = Some(graphElementRegistry.defaultCatalog),
+ currentDatabase = Some(graphElementRegistry.defaultDatabase))
+ .identifier
+ }.toSet
+ }
+
+ if (fullRefreshTables.nonEmpty && fullRefreshAll) {
+ throw new IllegalArgumentException(
+ "Cannot specify a subset to refresh when full refresh all is set to
true.")
+ }
+
+ if (refreshTables.nonEmpty && fullRefreshAll) {
+ throw new IllegalArgumentException(
+ "Cannot specify a subset to full refresh when full refresh all is set
to true.")
+ }
+ val refreshTableNames = parseTableNames(refreshTables)
+ val fullRefreshTableNames = parseTableNames(fullRefreshTables)
+
+ if (refreshTables.nonEmpty && fullRefreshTables.nonEmpty) {
+ // check if there is an intersection between the subset
+ val intersection = refreshTableNames.intersect(fullRefreshTableNames)
+ if (intersection.nonEmpty) {
+ throw new IllegalArgumentException(
+ "Datasets specified for refresh and full refresh cannot overlap: " +
+ s"${intersection.mkString(", ")}")
+ }
+ }
+
+ val fullRefreshTablesFilter: TableFilter = if (fullRefreshAll) {
+ AllTables
+ } else if (fullRefreshTables.nonEmpty) {
+ SomeTables(fullRefreshTableNames)
+ } else {
+ NoTables
+ }
+
+ val refreshTablesFilter: TableFilter =
+ if (refreshTables.nonEmpty) {
+ SomeTables(refreshTableNames)
+ } else if (fullRefreshTablesFilter != NoTables) {
+ NoTables
+ } else {
+ AllTables
+ }
Review Comment:
just an optional nit, but as a code reader it's difficult for me to reason
about the combinations of `fullRefreshTables` and `refreshTables` when reading
them as sequential but related validation here.
My suggestion would be to restructure this as a match statement, that
explicitly handles each combination. Ex.
```
(fullRefreshTables, refreshTableNames) match {
case (Nil, Nil) => ...
case (fullRefreshTables, Nil) => ...
case ...
}
```
##########
sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto:
##########
@@ -116,6 +116,15 @@ message PipelineCommand {
message StartRun {
// The graph to start.
optional string dataflow_graph_id = 1;
+
+ // List of tables to reset and recompute.
+ repeated string full_refresh = 2;
+
+ // Perform a full graph reset and recompute.
+ optional bool full_refresh_all = 3;
+
+ // List of tables to update.
+ repeated string refresh = 4;
Review Comment:
```suggestion
repeated string refresh_selection = 4;
```
##########
sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto:
##########
@@ -116,6 +116,15 @@ message PipelineCommand {
message StartRun {
// The graph to start.
optional string dataflow_graph_id = 1;
+
+ // List of tables to reset and recompute.
+ repeated string full_refresh = 2;
Review Comment:
```suggestion
repeated string full_refresh_selection = 2;
```
##########
python/pyspark/pipelines/cli.py:
##########
@@ -217,8 +217,30 @@ def change_dir(path: Path) -> Generator[None, None, None]:
os.chdir(prev)
-def run(spec_path: Path) -> None:
- """Run the pipeline defined with the given spec."""
+def run(
+ spec_path: Path,
+ full_refresh: Optional[Sequence[str]] = None,
+ full_refresh_all: bool = False,
+ refresh: Optional[Sequence[str]] = None,
+) -> None:
+ """Run the pipeline defined with the given spec.
+
+ :param spec_path: Path to the pipeline specification file.
+ :param full_refresh: List of datasets to reset and recompute.
+ :param full_refresh_all: Perform a full graph reset and recompute.
+ :param refresh: List of datasets to update.
+ """
+ # Validate conflicting arguments
+ if full_refresh_all:
+ if full_refresh:
+ raise PySparkException(
+ errorClass="CONFLICTING_PIPELINE_REFRESH_OPTIONS",
messageParameters={}
Review Comment:
Thoughts on having sub error classes for mismatched combinations? Or maybe
just pass along which two configs are conflicting as a message parameter?
##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala:
##########
@@ -224,6 +225,64 @@ private[connect] object PipelinesHandler extends Logging {
sessionHolder: SessionHolder): Unit = {
val dataflowGraphId = cmd.getDataflowGraphId
val graphElementRegistry =
DataflowGraphRegistry.getDataflowGraphOrThrow(dataflowGraphId)
+
+ // Extract refresh parameters from protobuf command
+ val fullRefreshTables = cmd.getFullRefreshList.asScala.toSeq
+ val fullRefreshAll = cmd.getFullRefreshAll
+ val refreshTables = cmd.getRefreshList.asScala.toSeq
+
+ // Convert table names to fully qualified TableIdentifier objects
+ def parseTableNames(tableNames: Seq[String]): Set[TableIdentifier] = {
+ tableNames.map { name =>
+ GraphIdentifierManager
+ .parseAndQualifyTableIdentifier(
+ rawTableIdentifier =
+ GraphIdentifierManager.parseTableIdentifier(name,
sessionHolder.session),
+ currentCatalog = Some(graphElementRegistry.defaultCatalog),
+ currentDatabase = Some(graphElementRegistry.defaultDatabase))
+ .identifier
+ }.toSet
+ }
+
+ if (fullRefreshTables.nonEmpty && fullRefreshAll) {
+ throw new IllegalArgumentException(
+ "Cannot specify a subset to refresh when full refresh all is set to
true.")
+ }
+
+ if (refreshTables.nonEmpty && fullRefreshAll) {
+ throw new IllegalArgumentException(
+ "Cannot specify a subset to full refresh when full refresh all is set
to true.")
+ }
Review Comment:
Are the exception messages flipped here?
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContextImpl.scala:
##########
@@ -24,10 +24,17 @@ import
org.apache.spark.sql.pipelines.logging.{FlowProgressEventLogger, Pipeline
* An implementation of the PipelineUpdateContext trait used in production.
* @param unresolvedGraph The graph (unresolved) to be executed in this update.
* @param eventCallback A callback function to be called when an event is
added to the event buffer.
+ * @param refreshTables Filter for which tables should be refreshed when
performing this update.
+ * @param fullRefreshTables Filter for which tables should be full refreshed
+ * when performing this update.
+ * @param resetCheckpointFlows Filter for which flows should be reset.
*/
class PipelineUpdateContextImpl(
override val unresolvedGraph: DataflowGraph,
- override val eventCallback: PipelineEvent => Unit
+ override val eventCallback: PipelineEvent => Unit,
+ override val refreshTables: TableFilter = AllTables,
+ override val fullRefreshTables: TableFilter = NoTables,
+ override val resetCheckpointFlows: FlowFilter = NoFlows
Review Comment:
Also is it possible this argument always being `NoFlows` now can logically
conflict with the `fullRefreshTables` argument, which is no longer always
`NoTables`?
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineUpdateContextImpl.scala:
##########
@@ -24,10 +24,17 @@ import
org.apache.spark.sql.pipelines.logging.{FlowProgressEventLogger, Pipeline
* An implementation of the PipelineUpdateContext trait used in production.
* @param unresolvedGraph The graph (unresolved) to be executed in this update.
* @param eventCallback A callback function to be called when an event is
added to the event buffer.
+ * @param refreshTables Filter for which tables should be refreshed when
performing this update.
+ * @param fullRefreshTables Filter for which tables should be full refreshed
+ * when performing this update.
+ * @param resetCheckpointFlows Filter for which flows should be reset.
*/
class PipelineUpdateContextImpl(
override val unresolvedGraph: DataflowGraph,
- override val eventCallback: PipelineEvent => Unit
+ override val eventCallback: PipelineEvent => Unit,
+ override val refreshTables: TableFilter = AllTables,
+ override val fullRefreshTables: TableFilter = NoTables,
+ override val resetCheckpointFlows: FlowFilter = NoFlows
Review Comment:
Why are we moving `resetCheckpointFlows` into the constructor arg for
`PipelineUpdateContextImpl`?
Not saying I disagree with this decision, but since the argument doesn't
seem to be used by any callers in this PR I'm curious when
`resetCheckpointFlows` will not be the default value of `NoFlows`.
##########
python/pyspark/pipelines/spark_connect_pipeline.py:
##########
@@ -65,12 +65,26 @@ def handle_pipeline_events(iter: Iterator[Dict[str, Any]])
-> None:
log_with_provided_timestamp(event.message, dt)
-def start_run(spark: SparkSession, dataflow_graph_id: str) ->
Iterator[Dict[str, Any]]:
+def start_run(
+ spark: SparkSession,
+ dataflow_graph_id: str,
+ full_refresh: Optional[Sequence[str]] = None,
+ full_refresh_all: bool = False,
+ refresh: Optional[Sequence[str]] = None,
+) -> Iterator[Dict[str, Any]]:
"""Start a run of the dataflow graph in the Spark Connect server.
:param dataflow_graph_id: The ID of the dataflow graph to start.
+ :param full_refresh: List of datasets to reset and recompute.
+ :param full_refresh_all: Perform a full graph reset and recompute.
+ :param refresh: List of datasets to update.
"""
- inner_command =
pb2.PipelineCommand.StartRun(dataflow_graph_id=dataflow_graph_id)
+ inner_command = pb2.PipelineCommand.StartRun(
+ dataflow_graph_id=dataflow_graph_id,
+ full_refresh=full_refresh or [],
Review Comment:
I'm a little confused by this. If `full_refresh` is None, we return an empty
list. But in `flatten_table_lists`, which we use to construct this
`full_refresh` list, we return None if the flattened list is empty. Can we
simplify the logic here or there?
##########
python/pyspark/pipelines/cli.py:
##########
@@ -217,8 +217,30 @@ def change_dir(path: Path) -> Generator[None, None, None]:
os.chdir(prev)
-def run(spec_path: Path) -> None:
- """Run the pipeline defined with the given spec."""
+def run(
Review Comment:
High level question: did we consider putting refresh selection options in
the pipeline spec, rather than as a CLI arg?
More generally, what's the philosophy for whether a configuration should be
accepted as a CLI arg vs a pipeline spec field?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]