szehon-ho commented on code in PR #56069:
URL: https://github.com/apache/spark/pull/56069#discussion_r3293756267


##########
python/pyspark/pipelines/api.py:
##########
@@ -525,3 +527,190 @@ def create_sink(
         comment=None,
     )
     get_active_graph_element_registry().register_output(sink)
+
+
+def create_auto_cdc_flow(
+    target: str,
+    source: str,
+    keys: Union[List[str], List[Column]],
+    sequence_by: Union[str, Column],
+    apply_as_deletes: Optional[Union[str, Column]] = None,
+    column_list: Optional[Union[List[str], List[Column]]] = None,
+    except_column_list: Optional[Union[List[str], List[Column]]] = None,
+    stored_as_scd_type: Optional[Literal[1, "1"]] = None,
+    name: Optional[str] = None,
+) -> None:
+    """
+    Create an Auto CDC flow into the target table from the Change Data Capture 
(CDC) source.
+    Target table must have already been created using create_streaming_table 
function. Only one
+    of column_list and except_column_list can be specified.
+
+    Example:
+    create_auto_cdc_flow(
+      target="target",
+      source="source",
+      keys=["key"],
+      sequence_by="sequence_expr",
+      column_list=["key", "value"],
+    )
+
+    Note that for keys, sequence_by, column_list, and except_column_list the 
arguments have to
+    be column identifiers without qualifiers, e.g. they cannot be 
col("sourceTable.keyId").
+
+    :param target: The name of the target table that receives the Auto CDC 
flow.
+    :param source: The name of the CDC source to stream from.
+    :param keys: The column or combination of columns that uniquely identify a 
row in the source \
+        data. This is used to identify which CDC events apply to specific 
records in the target \
+        table. These keys also identify records in the target table, e.g., if 
there exists a record \
+        for given keys and the CDC source has an UPSERT operation for the same 
keys, we will update \
+        the existing record. At least one key must be provided. This should be 
a list of column \
+        identifiers without qualifiers, expressed as either Python strings or 
PySpark Columns.
+    :param sequence_by: An expression that we use to order the source data. 
This can be expressed \
+        as either a SQL expression string or a PySpark Column.
+    :param apply_as_deletes: Delete condition for the merged operation. This 
should be a string of \
+        expression e.g. "operation = 'DELETE'"
+    :param column_list: Columns that will be included in the output table. 
This should be a list \
+        of column identifiers without qualifiers, expressed as either Python 
strings or PySpark \
+        Column. Only one of column_list and except_column_list can be 
specified.
+    :param except_column_list: Columns that will be excluded in the output 
table. This should be a \
+        list of column identifiers without qualifiers, expressed as either 
Python strings or \
+        PySpark Column. Only one of column_list and except_column_list can be 
specified. When this \
+        is specified, all columns in the dataframe of the target table except 
those in this list \
+        will be in the output table.
+    :param stored_as_scd_type: The SCD type for the target table. Only 1 (or 
"1") is supported. \
+        When not specified the server default applies.
+    :param name: The name of the flow for this create_auto_cdc_flow command. 
When unspecified this \
+           will build a "default flow" with name equal to the target name.
+    """
+    if type(target) is not str:
+        raise PySparkTypeError(
+            errorClass="NOT_EXPECTED_TYPE",
+            messageParameters={
+                "arg_name": "target",
+                "expected_type": "str",
+                "arg_type": type(target).__name__,
+            },
+        )
+    if type(source) is not str:
+        raise PySparkTypeError(
+            errorClass="NOT_EXPECTED_TYPE",
+            messageParameters={
+                "arg_name": "source",
+                "expected_type": "str",
+                "arg_type": type(source).__name__,
+            },
+        )
+    if name is not None and type(name) is not str:
+        raise PySparkTypeError(
+            errorClass="NOT_EXPECTED_TYPE",
+            messageParameters={
+                "arg_name": "name",
+                "expected_type": "str",
+                "arg_type": type(name).__name__,
+            },
+        )
+    

Review Comment:
   Several blank lines here have trailing whitespace (e.g. after the `name` 
type check and inside `_normalize_column_list`). Worth stripping — Black/ruff 
may flag these on CI.



##########
python/pyspark/pipelines/tests/test_graph_element_registry.py:
##########
@@ -97,6 +100,146 @@ def flow2():
         self.assertEqual(sink_obj.options["key1"], "value1")
         assert 
sink_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
 
+    def test_create_auto_cdc_flow(self):
+        registry = LocalGraphElementRegistry()
+        with graph_element_registration_context(registry):
+            dp.create_streaming_table("target")
+            dp.create_auto_cdc_flow(
+                target="target",
+                source="source",
+                keys=[col("key")],
+                sequence_by=expr("seq"),
+            )
+
+        self.assertEqual(len(registry.outputs), 1)
+        self.assertEqual(len(registry.auto_cdc_flows), 1)
+
+        flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
+        self.assertEqual(flow.target, "target")
+        self.assertEqual(flow.source, "source")
+        
+        # When name is not specified, it inherit's the target's name at 
construction time.
+        self.assertEqual(flow.name, "target")
+        self.assertIsNone(flow.stored_as_scd_type)
+        self.assertIsNone(flow.apply_as_deletes)
+        assert 
flow.source_code_location.filename.endswith("test_graph_element_registry.py")
+
+    def test_create_auto_cdc_flow_with_all_args(self):
+        registry = LocalGraphElementRegistry()
+        with graph_element_registration_context(registry):
+            dp.create_streaming_table("tgt")
+            dp.create_auto_cdc_flow(
+                target="tgt",
+                source="src",
+                keys=[col("id")],
+                sequence_by=expr("ts"),
+                apply_as_deletes=expr("op = 'DELETE'"),
+                column_list=[col("id"), col("val")],
+                stored_as_scd_type=1,
+                name="my_flow",
+            )
+
+        flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
+        self.assertEqual(flow.name, "my_flow")
+        self.assertEqual(flow.stored_as_scd_type, 1)
+
+    def test_create_auto_cdc_flow_with_string_args(self):
+        # Verify that string forms of column / expression arguments are 
normalized to
+        # PySpark Columns, equivalent to passing col(...) / expr(...) directly.
+        registry = LocalGraphElementRegistry()
+        with graph_element_registration_context(registry):
+            dp.create_streaming_table("tgt")
+            dp.create_auto_cdc_flow(
+                target="tgt",
+                source="src",
+                keys=["id"],
+                sequence_by="ts",
+                apply_as_deletes="op = 'DELETE'",
+                column_list=["id", "val"],
+            )
+
+        flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
+        for k in flow.keys:
+            self.assertIsInstance(k, Column)
+        self.assertIsInstance(flow.sequence_by, Column)
+        self.assertIsInstance(flow.apply_as_deletes, Column)
+        assert flow.column_list is not None
+        for c in flow.column_list:
+            self.assertIsInstance(c, Column)
+
+    def test_create_auto_cdc_flow_stored_as_scd_type_string(self):
+        registry = LocalGraphElementRegistry()
+        with graph_element_registration_context(registry):
+            dp.create_streaming_table("t")
+            dp.create_auto_cdc_flow(
+                target="t",
+                source="s",
+                keys=[col("k")],
+                sequence_by=expr("seq"),
+                stored_as_scd_type="1",
+            )
+
+        flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
+        self.assertEqual(flow.stored_as_scd_type, "1")
+
+    def test_create_auto_cdc_flow_invalid_scd_type(self):
+        registry = LocalGraphElementRegistry()
+        with graph_element_registration_context(registry):
+            dp.create_streaming_table("t")
+            with self.assertRaises(PySparkTypeError) as ctx:
+                dp.create_auto_cdc_flow(
+                    target="t",
+                    source="s",
+                    keys=[col("k")],
+                    sequence_by=expr("seq"),
+                    stored_as_scd_type=2,  # type: ignore[arg-type]
+                )
+            self.assertEqual(ctx.exception.getCondition(), "NOT_EXPECTED_TYPE")
+
+    def test_create_auto_cdc_flow_with_except_column_list(self):
+        registry = LocalGraphElementRegistry()
+        with graph_element_registration_context(registry):
+            dp.create_streaming_table("tgt")
+            dp.create_auto_cdc_flow(
+                target="tgt",
+                source="src",
+                keys=[col("id")],
+                sequence_by=expr("ts"),
+                except_column_list=["op", "ts"],
+            )
+
+        flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
+        self.assertIsNone(flow.column_list)
+        assert flow.except_column_list is not None
+        self.assertEqual(len(flow.except_column_list), 2)
+        for c in flow.except_column_list:
+            self.assertIsInstance(c, Column)
+
+    def test_create_auto_cdc_flow_rejects_non_str_target(self):

Review Comment:
   The other `create_auto_cdc_flow` tests call `create_streaming_table` on the 
target first. Harmless for `LocalGraphElementRegistry`, but adding it here 
would match the doc requirement and keep the tests consistent.



##########
python/pyspark/pipelines/api.py:
##########
@@ -525,3 +527,190 @@ def create_sink(
         comment=None,
     )
     get_active_graph_element_registry().register_output(sink)
+
+
+def create_auto_cdc_flow(
+    target: str,
+    source: str,
+    keys: Union[List[str], List[Column]],
+    sequence_by: Union[str, Column],
+    apply_as_deletes: Optional[Union[str, Column]] = None,
+    column_list: Optional[Union[List[str], List[Column]]] = None,
+    except_column_list: Optional[Union[List[str], List[Column]]] = None,
+    stored_as_scd_type: Optional[Literal[1, "1"]] = None,
+    name: Optional[str] = None,
+) -> None:
+    """
+    Create an Auto CDC flow into the target table from the Change Data Capture 
(CDC) source.
+    Target table must have already been created using create_streaming_table 
function. Only one
+    of column_list and except_column_list can be specified.
+
+    Example:
+    create_auto_cdc_flow(

Review Comment:
   The example uses 2-space continuation indent for arguments; other SDP doc 
examples (e.g. `create_streaming_table`) use 4 spaces inside the docstring 
block. Worth aligning for consistency:
   
   ```python
       Example:
           create_auto_cdc_flow(
               target="target",
               source="source",
               keys=["key"],
               sequence_by="sequence_expr",
               column_list=["key", "value"],
           )
   ```



##########
python/pyspark/pipelines/tests/test_graph_element_registry.py:
##########
@@ -97,6 +100,146 @@ def flow2():
         self.assertEqual(sink_obj.options["key1"], "value1")
         assert 
sink_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
 
+    def test_create_auto_cdc_flow(self):
+        registry = LocalGraphElementRegistry()
+        with graph_element_registration_context(registry):
+            dp.create_streaming_table("target")
+            dp.create_auto_cdc_flow(
+                target="target",
+                source="source",
+                keys=[col("key")],
+                sequence_by=expr("seq"),
+            )
+
+        self.assertEqual(len(registry.outputs), 1)
+        self.assertEqual(len(registry.auto_cdc_flows), 1)
+
+        flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
+        self.assertEqual(flow.target, "target")
+        self.assertEqual(flow.source, "source")
+        
+        # When name is not specified, it inherit's the target's name at 
construction time.
+        self.assertEqual(flow.name, "target")
+        self.assertIsNone(flow.stored_as_scd_type)
+        self.assertIsNone(flow.apply_as_deletes)
+        assert 
flow.source_code_location.filename.endswith("test_graph_element_registry.py")
+
+    def test_create_auto_cdc_flow_with_all_args(self):
+        registry = LocalGraphElementRegistry()
+        with graph_element_registration_context(registry):
+            dp.create_streaming_table("tgt")
+            dp.create_auto_cdc_flow(
+                target="tgt",
+                source="src",
+                keys=[col("id")],
+                sequence_by=expr("ts"),
+                apply_as_deletes=expr("op = 'DELETE'"),
+                column_list=[col("id"), col("val")],
+                stored_as_scd_type=1,
+                name="my_flow",
+            )
+
+        flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
+        self.assertEqual(flow.name, "my_flow")
+        self.assertEqual(flow.stored_as_scd_type, 1)
+
+    def test_create_auto_cdc_flow_with_string_args(self):
+        # Verify that string forms of column / expression arguments are 
normalized to
+        # PySpark Columns, equivalent to passing col(...) / expr(...) directly.
+        registry = LocalGraphElementRegistry()
+        with graph_element_registration_context(registry):
+            dp.create_streaming_table("tgt")
+            dp.create_auto_cdc_flow(
+                target="tgt",
+                source="src",
+                keys=["id"],
+                sequence_by="ts",
+                apply_as_deletes="op = 'DELETE'",
+                column_list=["id", "val"],
+            )
+
+        flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
+        for k in flow.keys:
+            self.assertIsInstance(k, Column)
+        self.assertIsInstance(flow.sequence_by, Column)
+        self.assertIsInstance(flow.apply_as_deletes, Column)
+        assert flow.column_list is not None
+        for c in flow.column_list:
+            self.assertIsInstance(c, Column)
+
+    def test_create_auto_cdc_flow_stored_as_scd_type_string(self):
+        registry = LocalGraphElementRegistry()
+        with graph_element_registration_context(registry):
+            dp.create_streaming_table("t")
+            dp.create_auto_cdc_flow(
+                target="t",
+                source="s",
+                keys=[col("k")],
+                sequence_by=expr("seq"),
+                stored_as_scd_type="1",
+            )
+
+        flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
+        self.assertEqual(flow.stored_as_scd_type, "1")
+
+    def test_create_auto_cdc_flow_invalid_scd_type(self):
+        registry = LocalGraphElementRegistry()
+        with graph_element_registration_context(registry):
+            dp.create_streaming_table("t")
+            with self.assertRaises(PySparkTypeError) as ctx:
+                dp.create_auto_cdc_flow(
+                    target="t",
+                    source="s",
+                    keys=[col("k")],
+                    sequence_by=expr("seq"),
+                    stored_as_scd_type=2,  # type: ignore[arg-type]
+                )
+            self.assertEqual(ctx.exception.getCondition(), "NOT_EXPECTED_TYPE")
+
+    def test_create_auto_cdc_flow_with_except_column_list(self):
+        registry = LocalGraphElementRegistry()
+        with graph_element_registration_context(registry):
+            dp.create_streaming_table("tgt")
+            dp.create_auto_cdc_flow(
+                target="tgt",
+                source="src",
+                keys=[col("id")],
+                sequence_by=expr("ts"),
+                except_column_list=["op", "ts"],
+            )
+
+        flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
+        self.assertIsNone(flow.column_list)
+        assert flow.except_column_list is not None
+        self.assertEqual(len(flow.except_column_list), 2)
+        for c in flow.except_column_list:
+            self.assertIsInstance(c, Column)
+
+    def test_create_auto_cdc_flow_rejects_non_str_target(self):
+        registry = LocalGraphElementRegistry()
+        with graph_element_registration_context(registry):
+            with self.assertRaises(PySparkTypeError) as ctx:
+                dp.create_auto_cdc_flow(
+                    target=123,  # type: ignore[arg-type]
+                    source="src",
+                    keys=[col("id")],
+                    sequence_by=expr("ts"),
+                )
+            self.assertEqual(ctx.exception.getCondition(), "NOT_EXPECTED_TYPE")
+
+    def test_create_auto_cdc_flow_rejects_invalid_key_element(self):
+        registry = LocalGraphElementRegistry()
+        with graph_element_registration_context(registry):
+            dp.create_streaming_table("tgt")
+            with self.assertRaises(PySparkTypeError) as ctx:
+                dp.create_auto_cdc_flow(
+                    target="tgt",
+                    source="src",
+                    keys=[123],  # type: ignore[list-item]
+                    sequence_by=expr("ts"),
+                )
+            self.assertEqual(ctx.exception.getCondition(), "NOT_EXPECTED_TYPE")
+
     def test_definition_without_graph_element_registry(self):

Review Comment:
   `test_definition_without_graph_element_registry` covers decorators, 
`create_streaming_table`, and `append_flow`, but not `create_auto_cdc_flow`. 
Could you add a case like:
   
   ```python
   with self.assertRaises(PySparkException) as context:
       dp.create_auto_cdc_flow(
           target="t",
           source="s",
           keys=["k"],
           sequence_by="seq",
       )
   self.assertEqual(
       context.exception.getCondition(),
       "GRAPH_ELEMENT_DEFINED_OUTSIDE_OF_DECLARATIVE_PIPELINE",
   )
   ```



##########
python/pyspark/pipelines/tests/test_graph_element_registry.py:
##########
@@ -97,6 +100,146 @@ def flow2():
         self.assertEqual(sink_obj.options["key1"], "value1")
         assert 
sink_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
 
+    def test_create_auto_cdc_flow(self):
+        registry = LocalGraphElementRegistry()
+        with graph_element_registration_context(registry):
+            dp.create_streaming_table("target")
+            dp.create_auto_cdc_flow(
+                target="target",
+                source="source",
+                keys=[col("key")],
+                sequence_by=expr("seq"),
+            )
+
+        self.assertEqual(len(registry.outputs), 1)
+        self.assertEqual(len(registry.auto_cdc_flows), 1)
+
+        flow = cast(AutoCdcFlow, registry.auto_cdc_flows[0])
+        self.assertEqual(flow.target, "target")
+        self.assertEqual(flow.source, "source")
+        
+        # When name is not specified, it inherit's the target's name at 
construction time.

Review Comment:
   Nit: `inherit's` → `inherits`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to