szehon-ho commented on code in PR #56045:
URL: https://github.com/apache/spark/pull/56045#discussion_r3291377573
##########
python/pyspark/pipelines/api.py:
##########
@@ -525,3 +527,136 @@ def create_sink(
comment=None,
)
get_active_graph_element_registry().register_output(sink)
+
+
+def create_auto_cdc_flow(
+ target: str,
+ source: str,
+ keys: Union[List[str], List[Column]],
+ sequence_by: Union[str, Column],
+ apply_as_deletes: Optional[Union[str, Column]] = None,
+ apply_as_truncates: Optional[Union[str, Column]] = None,
+ column_list: Optional[Union[List[str], List[Column]]] = None,
+ except_column_list: Optional[Union[List[str], List[Column]]] = None,
+ stored_as_scd_type: Optional[Literal[1, "1"]] = None,
+ name: Optional[str] = None,
+ ignore_null_updates_column_list: Optional[Union[List[str], List[Column]]]
= None,
+ ignore_null_updates_except_column_list: Optional[Union[List[str],
List[Column]]] = None,
+) -> None:
+ """
+ Create an Auto CDC flow into the target table from the Change Data Capture
(CDC) source.
+ Target table must have already been created using create_streaming_table
function. Only one
+ of column_list and except_column_list can be specified.
+
+ Example:
+ create_auto_cdc_flow(
+ target = "target",
+ source = "source",
+ keys = ["key"],
+ sequence_by = "sequence_expr",
+ ignore_null_updates_column_list = ["value"],
+ column_list = ["key", "value"],
+ )
+
+ Note that for keys, sequence_by, column_list, except_column_list,
+ ignore_null_updates_column_list, and
ignore_null_updates_except_column_list the arguments
+ have to be column identifiers without qualifiers, e.g. they cannot be
+ col("sourceTable.keyId").
+
+ :param target: The name of the target table that receives the Auto CDC
flow.
+ :param source: The name of the CDC source to stream from.
+ :param keys: The column or combination of columns that uniquely identify a
row in the source \
+ data. This is used to identify which CDC events apply to specific
records in the target \
+ table. These keys also identify records in the target table, e.g., if
there exists a record \
+ for given keys and the CDC source has an UPSERT operation for the same
keys, we will update \
+ the existing record. At least one key must be provided. This should be
a list of column \
+ identifiers without qualifiers, expressed as either Python strings or
Pyspark Columns.
+ :param sequence_by: An expression that we use to order the source data.
This can be expressed \
+ as either a Python string or Pyspark Expression.
+ :param apply_as_deletes: Delete condition for the merged operation. This
should be a string of \
+ expression e.g. "operation = 'DELETE'"
+ :param apply_as_truncates: Truncate condition for the merged operation.
This should be a string \
+ expression e.g. "operation = 'TRUNCATE'"
+ :param column_list: Columns that will be included in the output table.
This should be a list \
+ of column identifiers without qualifiers, expressed as either Python
strings or Pyspark \
+ Column. Only one of column_list and except_column_list can be
specified.
+ :param except_column_list: Columns that will be excluded in the output
table. This should be a \
+ list of column identifiers without qualifiers, expressed as either
Python strings or Pyspark \
+ Column. Only one of column_list and except_column_list can be
specified. When this is \
+ specified, all columns in the dataframe of the target table except
those in this list will \
+ be in the output table.
+ :param stored_as_scd_type: The SCD type for the target table. Only 1 (or
"1") is supported. \
+ When not specified the server default applies.
+ :param name: The name of the flow for this create_auto_cdc_flow command.
When unspecified this \
+ will build a "default flow" with name equal to the target name.
+ :param ignore_null_updates_column_list: Subset of columns to ignore null
values in during \
+ updates. When a source row has a null for one of these columns, the
existing value in the \
+ target is preserved. Only one of ignore_null_updates_column_list and \
+ ignore_null_updates_except_column_list can be specified.
+ :param ignore_null_updates_except_column_list: Columns excluded from
null-update ignoring. \
+ All other columns will have null values ignored during updates. Only
one of \
+ ignore_null_updates_column_list and
ignore_null_updates_except_column_list can be specified.
+ """
+ keys = _normalize_column_list(keys)
+
+ column_list = _normalize_optional_column_list(column_list)
+ except_column_list = _normalize_optional_column_list(except_column_list)
+ ignore_null_updates_column_list = _normalize_optional_column_list(
+ ignore_null_updates_column_list
+ )
+ ignore_null_updates_except_column_list = _normalize_optional_column_list(
+ ignore_null_updates_except_column_list
+ )
Review Comment:
Doc says mutual exclusion and non-empty `keys`, but nothing enforces it.
Validate after normalization (like other SDP APIs) so users get a clear client
error.
##########
python/pyspark/pipelines/api.py:
##########
@@ -525,3 +527,136 @@ def create_sink(
comment=None,
)
get_active_graph_element_registry().register_output(sink)
+
+
+def create_auto_cdc_flow(
+ target: str,
+ source: str,
+ keys: Union[List[str], List[Column]],
+ sequence_by: Union[str, Column],
+ apply_as_deletes: Optional[Union[str, Column]] = None,
+ apply_as_truncates: Optional[Union[str, Column]] = None,
+ column_list: Optional[Union[List[str], List[Column]]] = None,
+ except_column_list: Optional[Union[List[str], List[Column]]] = None,
+ stored_as_scd_type: Optional[Literal[1, "1"]] = None,
+ name: Optional[str] = None,
+ ignore_null_updates_column_list: Optional[Union[List[str], List[Column]]]
= None,
+ ignore_null_updates_except_column_list: Optional[Union[List[str],
List[Column]]] = None,
+) -> None:
+ """
+ Create an Auto CDC flow into the target table from the Change Data Capture
(CDC) source.
+ Target table must have already been created using create_streaming_table
function. Only one
+ of column_list and except_column_list can be specified.
+
+ Example:
+ create_auto_cdc_flow(
+ target = "target",
+ source = "source",
+ keys = ["key"],
+ sequence_by = "sequence_expr",
+ ignore_null_updates_column_list = ["value"],
+ column_list = ["key", "value"],
+ )
+
+ Note that for keys, sequence_by, column_list, except_column_list,
+ ignore_null_updates_column_list, and
ignore_null_updates_except_column_list the arguments
+ have to be column identifiers without qualifiers, e.g. they cannot be
+ col("sourceTable.keyId").
+
+ :param target: The name of the target table that receives the Auto CDC
flow.
+ :param source: The name of the CDC source to stream from.
+ :param keys: The column or combination of columns that uniquely identify a
row in the source \
+ data. This is used to identify which CDC events apply to specific
records in the target \
+ table. These keys also identify records in the target table, e.g., if
there exists a record \
+ for given keys and the CDC source has an UPSERT operation for the same
keys, we will update \
+ the existing record. At least one key must be provided. This should be
a list of column \
+ identifiers without qualifiers, expressed as either Python strings or
Pyspark Columns.
+ :param sequence_by: An expression that we use to order the source data.
This can be expressed \
+ as either a Python string or Pyspark Expression.
+ :param apply_as_deletes: Delete condition for the merged operation. This
should be a string of \
+ expression e.g. "operation = 'DELETE'"
+ :param apply_as_truncates: Truncate condition for the merged operation.
This should be a string \
+ expression e.g. "operation = 'TRUNCATE'"
+ :param column_list: Columns that will be included in the output table.
This should be a list \
+ of column identifiers without qualifiers, expressed as either Python
strings or Pyspark \
+ Column. Only one of column_list and except_column_list can be
specified.
+ :param except_column_list: Columns that will be excluded in the output
table. This should be a \
+ list of column identifiers without qualifiers, expressed as either
Python strings or Pyspark \
+ Column. Only one of column_list and except_column_list can be
specified. When this is \
+ specified, all columns in the dataframe of the target table except
those in this list will \
+ be in the output table.
+ :param stored_as_scd_type: The SCD type for the target table. Only 1 (or
"1") is supported. \
+ When not specified the server default applies.
+ :param name: The name of the flow for this create_auto_cdc_flow command.
When unspecified this \
+ will build a "default flow" with name equal to the target name.
+ :param ignore_null_updates_column_list: Subset of columns to ignore null
values in during \
+ updates. When a source row has a null for one of these columns, the
existing value in the \
+ target is preserved. Only one of ignore_null_updates_column_list and \
+ ignore_null_updates_except_column_list can be specified.
+ :param ignore_null_updates_except_column_list: Columns excluded from
null-update ignoring. \
+ All other columns will have null values ignored during updates. Only
one of \
+ ignore_null_updates_column_list and
ignore_null_updates_except_column_list can be specified.
+ """
+ keys = _normalize_column_list(keys)
+
+ column_list = _normalize_optional_column_list(column_list)
+ except_column_list = _normalize_optional_column_list(except_column_list)
+ ignore_null_updates_column_list = _normalize_optional_column_list(
+ ignore_null_updates_column_list
+ )
+ ignore_null_updates_except_column_list = _normalize_optional_column_list(
+ ignore_null_updates_except_column_list
+ )
+
+ if isinstance(sequence_by, str):
+ sequence_by = F.expr(sequence_by)
+
+ if isinstance(apply_as_deletes, str):
+ apply_as_deletes = F.expr(apply_as_deletes)
+
+ if isinstance(apply_as_truncates, str):
+ apply_as_truncates = F.expr(apply_as_truncates)
+
+ if stored_as_scd_type is not None and str(stored_as_scd_type) != "1":
+ raise PySparkTypeError(
+ errorClass="NOT_EXPECTED_TYPE",
+ messageParameters={
+ "arg_name": "stored_as_scd_type",
+ "expected_type": "Literal[1, '1']",
+ "arg_type": type(stored_as_scd_type).__name__,
+ },
+ )
+
+ source_code_location = get_caller_source_code_location(stacklevel=1)
+
+ flow = AutoCdcFlow(
+ name=name,
+ target=target,
+ source=source,
+ keys=keys,
+ sequence_by=sequence_by,
+ apply_as_deletes=apply_as_deletes,
+ apply_as_truncates=apply_as_truncates,
+ column_list=column_list,
+ except_column_list=except_column_list,
+ stored_as_scd_type=stored_as_scd_type,
+ ignore_null_updates_column_list=ignore_null_updates_column_list,
+
ignore_null_updates_except_column_list=ignore_null_updates_except_column_list,
+ source_code_location=source_code_location,
+ )
+
+ get_active_graph_element_registry().register_auto_cdc_flow(flow)
+
+
+def _normalize_optional_column_list(
+ column_list: Optional[Union[List[str], List[Column]]],
+) -> Optional[List[Column]]:
+ if column_list is None:
+ return None
+ return _normalize_column_list(column_list)
+
+
+def _normalize_column_list(
Review Comment:
Add tests for string args (`keys=["id"]`, `sequence_by="ts"`, etc.), not
only Connect `col`/`expr`.
##########
python/pyspark/pipelines/spark_connect_graph_element_registry.py:
##########
@@ -17,8 +17,13 @@
from pathlib import Path
from pyspark.errors import PySparkTypeError
-from pyspark.sql import SparkSession
+from pyspark.sql import SparkSession, Column
from pyspark.sql.connect.dataframe import DataFrame as ConnectDataFrame
+from pyspark.sql.connect.types import pyspark_types_to_proto_types
+from pyspark.sql.types import StructType
+from pyspark.pipelines.add_pipeline_analysis_context import
add_pipeline_analysis_context
Review Comment:
Nit: import shuffle only — consider keeping prior order to shrink diff.
##########
python/pyspark/pipelines/spark_connect_graph_element_registry.py:
##########
@@ -133,6 +134,47 @@ def register_flow(self, flow: Flow) -> None:
command.pipeline_command.define_flow.CopyFrom(inner_command)
self._client.execute_command(command)
+ def register_auto_cdc_flow(self, flow: AutoCdcFlow) -> None:
Review Comment:
After server implements `AUTO_CDC_FLOW_DETAILS`, add a Connect registry
test; note in PR that Connect still throws today.
##########
python/pyspark/pipelines/api.py:
##########
@@ -525,3 +527,136 @@ def create_sink(
comment=None,
)
get_active_graph_element_registry().register_output(sink)
+
+
+def create_auto_cdc_flow(
+ target: str,
+ source: str,
+ keys: Union[List[str], List[Column]],
+ sequence_by: Union[str, Column],
+ apply_as_deletes: Optional[Union[str, Column]] = None,
+ apply_as_truncates: Optional[Union[str, Column]] = None,
+ column_list: Optional[Union[List[str], List[Column]]] = None,
+ except_column_list: Optional[Union[List[str], List[Column]]] = None,
+ stored_as_scd_type: Optional[Literal[1, "1"]] = None,
+ name: Optional[str] = None,
+ ignore_null_updates_column_list: Optional[Union[List[str], List[Column]]]
= None,
+ ignore_null_updates_except_column_list: Optional[Union[List[str],
List[Column]]] = None,
+) -> None:
+ """
+ Create an Auto CDC flow into the target table from the Change Data Capture
(CDC) source.
+ Target table must have already been created using create_streaming_table
function. Only one
+ of column_list and except_column_list can be specified.
+
+ Example:
+ create_auto_cdc_flow(
+ target = "target",
+ source = "source",
+ keys = ["key"],
+ sequence_by = "sequence_expr",
+ ignore_null_updates_column_list = ["value"],
+ column_list = ["key", "value"],
+ )
+
+ Note that for keys, sequence_by, column_list, except_column_list,
+ ignore_null_updates_column_list, and
ignore_null_updates_except_column_list the arguments
+ have to be column identifiers without qualifiers, e.g. they cannot be
+ col("sourceTable.keyId").
+
+ :param target: The name of the target table that receives the Auto CDC
flow.
+ :param source: The name of the CDC source to stream from.
+ :param keys: The column or combination of columns that uniquely identify a
row in the source \
+ data. This is used to identify which CDC events apply to specific
records in the target \
+ table. These keys also identify records in the target table, e.g., if
there exists a record \
+ for given keys and the CDC source has an UPSERT operation for the same
keys, we will update \
+ the existing record. At least one key must be provided. This should be
a list of column \
+ identifiers without qualifiers, expressed as either Python strings or
Pyspark Columns.
Review Comment:
Nit: `Pyspark` → `PySpark` in this docstring (573, 575, 581-585).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]