szehon-ho commented on code in PR #56069:
URL: https://github.com/apache/spark/pull/56069#discussion_r3291852630
##########
python/pyspark/pipelines/api.py:
##########
@@ -525,3 +527,109 @@ def create_sink(
comment=None,
)
get_active_graph_element_registry().register_output(sink)
+
+
+def create_auto_cdc_flow(
+ target: str,
+ source: str,
+ keys: Union[List[str], List[Column]],
+ sequence_by: Union[str, Column],
+ apply_as_deletes: Optional[Union[str, Column]] = None,
+ column_list: Optional[Union[List[str], List[Column]]] = None,
+ except_column_list: Optional[Union[List[str], List[Column]]] = None,
+ stored_as_scd_type: Optional[Literal[1, "1"]] = None,
+ name: Optional[str] = None,
+) -> None:
+ """
+ Create an Auto CDC flow into the target table from the Change Data Capture
(CDC) source.
+ Target table must have already been created using create_streaming_table
function. Only one
+ of column_list and except_column_list can be specified.
+
+ Example:
+ create_auto_cdc_flow(
+ target = "target",
Review Comment:
Nit: the example uses `target = "target"` (spaces around `=`). The
implementation and PEP 8 style elsewhere use `target="target"`. Worth aligning
the example for consistency.
##########
python/pyspark/pipelines/api.py:
##########
@@ -525,3 +527,109 @@ def create_sink(
comment=None,
)
get_active_graph_element_registry().register_output(sink)
+
+
+def create_auto_cdc_flow(
+ target: str,
+ source: str,
+ keys: Union[List[str], List[Column]],
+ sequence_by: Union[str, Column],
+ apply_as_deletes: Optional[Union[str, Column]] = None,
+ column_list: Optional[Union[List[str], List[Column]]] = None,
+ except_column_list: Optional[Union[List[str], List[Column]]] = None,
+ stored_as_scd_type: Optional[Literal[1, "1"]] = None,
+ name: Optional[str] = None,
+) -> None:
+ """
+ Create an Auto CDC flow into the target table from the Change Data Capture
(CDC) source.
+ Target table must have already been created using create_streaming_table
function. Only one
+ of column_list and except_column_list can be specified.
+
+ Example:
+ create_auto_cdc_flow(
+ target = "target",
+ source = "source",
+ keys = ["key"],
+ sequence_by = "sequence_expr",
+ column_list = ["key", "value"],
+ )
+
+ Note that for keys, sequence_by, column_list, and except_column_list the
arguments have to
+ be column identifiers without qualifiers, e.g. they cannot be
col("sourceTable.keyId").
+
+ :param target: The name of the target table that receives the Auto CDC
flow.
+ :param source: The name of the CDC source to stream from.
+ :param keys: The column or combination of columns that uniquely identify a
row in the source \
+ data. This is used to identify which CDC events apply to specific
records in the target \
+ table. These keys also identify records in the target table, e.g., if
there exists a record \
+ for given keys and the CDC source has an UPSERT operation for the same
keys, we will update \
+ the existing record. At least one key must be provided. This should be
a list of column \
+ identifiers without qualifiers, expressed as either Python strings or
PySpark Columns.
+ :param sequence_by: An expression that we use to order the source data.
This can be expressed \
Review Comment:
Nit: this says "PySpark Expression"; other params say "PySpark Column".
Since string args become `F.expr(...)`, something like "a SQL expression string
or `Column`" might be clearer and consistent with `apply_as_deletes`.
##########
python/pyspark/pipelines/tests/test_graph_element_registry.py:
##########
@@ -97,6 +100,98 @@ def flow2():
self.assertEqual(sink_obj.options["key1"], "value1")
assert
sink_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
+ def test_create_auto_cdc_flow(self):
+ registry = LocalGraphElementRegistry()
+ with graph_element_registration_context(registry):
+ dp.create_streaming_table("target")
+ dp.create_auto_cdc_flow(
+ target="target",
+ source="source",
+ keys=[col("key")],
+ sequence_by=expr("seq"),
+ )
+
+ self.assertEqual(len(registry.outputs), 1)
+ self.assertEqual(len(registry.auto_cdc_flows), 1)
+
+ flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
+ self.assertEqual(flow.target, "target")
+ self.assertEqual(flow.source, "source")
+ self.assertIsNone(flow.name)
+ self.assertIsNone(flow.stored_as_scd_type)
+ self.assertIsNone(flow.apply_as_deletes)
+ assert
flow.source_code_location.filename.endswith("test_graph_element_registry.py")
+
+ def test_create_auto_cdc_flow_with_all_args(self):
Review Comment:
`column_list` is covered in `test_create_auto_cdc_flow_with_all_args` and
the string-args test, but `except_column_list` has no test. Could you add a
case that passes `except_column_list` (and optionally one that sets both
`column_list` and `except_column_list` and expects an error, once client-side
validation is in)?
##########
python/pyspark/pipelines/spark_connect_graph_element_registry.py:
##########
@@ -133,6 +133,41 @@ def register_flow(self, flow: Flow) -> None:
command.pipeline_command.define_flow.CopyFrom(inner_command)
self._client.execute_command(command)
+ def register_auto_cdc_flow(self, flow: AutoCdcFlow) -> None:
+ from pyspark.sql.connect.column import Column as ConnectColumn
+
+ def to_plan(col: Column) -> Any:
+ return cast(ConnectColumn, col).to_plan(self._client)
+
+ def to_plans(cols: Optional[List[Column]]) -> list:
+ return [] if cols is None else [to_plan(c) for c in cols]
+
+ auto_cdc_details = pb2.PipelineCommand.DefineFlow.AutoCdcFlowDetails(
+ source=flow.source,
+ keys=to_plans(flow.keys),
+ sequence_by=to_plan(flow.sequence_by),
+ column_list=to_plans(flow.column_list),
+ except_column_list=to_plans(flow.except_column_list),
+ )
+ if flow.stored_as_scd_type is not None:
+ auto_cdc_details.stored_as_scd_type =
pb2.PipelineCommand.DefineFlow.SCDType.SCD_TYPE_1
+ if flow.apply_as_deletes is not None:
+
auto_cdc_details.apply_as_deletes.CopyFrom(to_plan(flow.apply_as_deletes))
+
+ inner_command = pb2.PipelineCommand.DefineFlow(
+ dataflow_graph_id=self._dataflow_graph_id,
+ target_dataset_name=flow.target,
+ auto_cdc_flow_details=auto_cdc_details,
+ sql_conf={},
+
source_code_location=source_code_location_to_proto(flow.source_code_location),
+ )
+ if flow.name is not None:
Review Comment:
When `name` is omitted, the docstring says the flow name defaults to the
target name. Here `flow_name` is only set when `flow.name is not None`, so the
proto may leave `flow_name` unset (empty on the server).
`PipelinesHandler.defineFlow` always uses `flow.getFlowName()` (e.g. for
implicit-flow detection).
Should we always send `flow_name`, e.g. `inner_command.flow_name =
flow.target if flow.name is None else flow.name`, so Connect matches the
documented default once `AutoCdcFlowDetails` is implemented?
##########
python/pyspark/pipelines/tests/test_graph_element_registry.py:
##########
@@ -97,6 +100,98 @@ def flow2():
self.assertEqual(sink_obj.options["key1"], "value1")
assert
sink_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
+ def test_create_auto_cdc_flow(self):
+ registry = LocalGraphElementRegistry()
+ with graph_element_registration_context(registry):
+ dp.create_streaming_table("target")
+ dp.create_auto_cdc_flow(
+ target="target",
+ source="source",
+ keys=[col("key")],
+ sequence_by=expr("seq"),
+ )
+
+ self.assertEqual(len(registry.outputs), 1)
+ self.assertEqual(len(registry.auto_cdc_flows), 1)
+
+ flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
+ self.assertEqual(flow.target, "target")
+ self.assertEqual(flow.source, "source")
+ self.assertIsNone(flow.name)
+ self.assertIsNone(flow.stored_as_scd_type)
+ self.assertIsNone(flow.apply_as_deletes)
+ assert
flow.source_code_location.filename.endswith("test_graph_element_registry.py")
+
+ def test_create_auto_cdc_flow_with_all_args(self):
+ registry = LocalGraphElementRegistry()
+ with graph_element_registration_context(registry):
+ dp.create_streaming_table("tgt")
+ dp.create_auto_cdc_flow(
+ target="tgt",
+ source="src",
+ keys=[col("id")],
+ sequence_by=expr("ts"),
+ apply_as_deletes=expr("op = 'DELETE'"),
+ column_list=[col("id"), col("val")],
+ stored_as_scd_type=1,
+ name="my_flow",
+ )
+
+ flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
+ self.assertEqual(flow.name, "my_flow")
+ self.assertEqual(flow.stored_as_scd_type, 1)
+
+ def test_create_auto_cdc_flow_with_string_args(self):
+ # Verify that string forms of column / expression arguments are
normalized to
+ # PySpark Columns, equivalent to passing col(...) / expr(...) directly.
+ registry = LocalGraphElementRegistry()
+ with graph_element_registration_context(registry):
+ dp.create_streaming_table("tgt")
+ dp.create_auto_cdc_flow(
+ target="tgt",
+ source="src",
+ keys=["id"],
+ sequence_by="ts",
+ apply_as_deletes="op = 'DELETE'",
+ column_list=["id", "val"],
+ )
+
+ flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
+ for k in flow.keys:
+ self.assertIsInstance(k, Column)
+ self.assertIsInstance(flow.sequence_by, Column)
+ self.assertIsInstance(flow.apply_as_deletes, Column)
+ assert flow.column_list is not None
+ for c in flow.column_list:
+ self.assertIsInstance(c, Column)
+
+ def test_create_auto_cdc_flow_stored_as_scd_type_string(self):
Review Comment:
Nit: the other `create_auto_cdc_flow` tests call `create_streaming_table` on
the target first; this one does not. Harmless for `LocalGraphElementRegistry`,
but adding it would match the doc requirement and the other tests.
##########
python/pyspark/pipelines/api.py:
##########
@@ -525,3 +527,109 @@ def create_sink(
comment=None,
)
get_active_graph_element_registry().register_output(sink)
+
+
+def create_auto_cdc_flow(
+ target: str,
+ source: str,
+ keys: Union[List[str], List[Column]],
+ sequence_by: Union[str, Column],
+ apply_as_deletes: Optional[Union[str, Column]] = None,
+ column_list: Optional[Union[List[str], List[Column]]] = None,
+ except_column_list: Optional[Union[List[str], List[Column]]] = None,
+ stored_as_scd_type: Optional[Literal[1, "1"]] = None,
+ name: Optional[str] = None,
+) -> None:
+ """
+ Create an Auto CDC flow into the target table from the Change Data Capture
(CDC) source.
+ Target table must have already been created using create_streaming_table
function. Only one
+ of column_list and except_column_list can be specified.
+
+ Example:
+ create_auto_cdc_flow(
+ target = "target",
+ source = "source",
+ keys = ["key"],
+ sequence_by = "sequence_expr",
+ column_list = ["key", "value"],
+ )
+
+ Note that for keys, sequence_by, column_list, and except_column_list the
arguments have to
+ be column identifiers without qualifiers, e.g. they cannot be
col("sourceTable.keyId").
+
+ :param target: The name of the target table that receives the Auto CDC
flow.
+ :param source: The name of the CDC source to stream from.
+ :param keys: The column or combination of columns that uniquely identify a
row in the source \
+ data. This is used to identify which CDC events apply to specific
records in the target \
+ table. These keys also identify records in the target table, e.g., if
there exists a record \
+ for given keys and the CDC source has an UPSERT operation for the same
keys, we will update \
+ the existing record. At least one key must be provided. This should be
a list of column \
+ identifiers without qualifiers, expressed as either Python strings or
PySpark Columns.
+ :param sequence_by: An expression that we use to order the source data.
This can be expressed \
+ as either a Python string or PySpark Expression.
+ :param apply_as_deletes: Delete condition for the merged operation. This
should be a string of \
+ expression e.g. "operation = 'DELETE'"
+ :param column_list: Columns that will be included in the output table.
This should be a list \
+ of column identifiers without qualifiers, expressed as either Python
strings or PySpark \
+ Column. Only one of column_list and except_column_list can be
specified.
+ :param except_column_list: Columns that will be excluded in the output
table. This should be a \
+ list of column identifiers without qualifiers, expressed as either
Python strings or \
+ PySpark Column. Only one of column_list and except_column_list can be
specified. When this \
+ is specified, all columns in the dataframe of the target table except
those in this list \
+ will be in the output table.
+ :param stored_as_scd_type: The SCD type for the target table. Only 1 (or
"1") is supported. \
+ When not specified the server default applies.
+ :param name: The name of the flow for this create_auto_cdc_flow command.
When unspecified this \
+ will build a "default flow" with name equal to the target name.
+ """
+ keys = _normalize_column_list(keys)
Review Comment:
Other SDP APIs validate inputs up front (`create_streaming_table` checks
`type(name) is not str`, etc.). Consider similar checks here before building
`AutoCdcFlow`:
- `target` / `source`: `str` (same pattern as `name` in
`create_streaming_table`)
- `keys`: non-empty list; reject mixed `[str, Column]` if you want to match
`Union[List[str], List[Column]]`
- `column_list` vs `except_column_list`: error if both are set (doc already
says only one is allowed)
- `_normalize_column_list`: reject elements that are neither `str` nor
`Column` with `PySparkTypeError` instead of passing them through
Happy to keep this minimal for the first cut, but matching existing SDP
validation would give clearer errors at definition time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]