AnishMahto commented on code in PR #56113:
URL: https://github.com/apache/spark/pull/56113#discussion_r3305124702


##########
python/pyspark/pipelines/flow.py:
##########
@@ -51,19 +51,21 @@ class AutoCdcFlow:
     An Auto CDC flow applies Change Data Capture (CDC) events from a source to 
a target
     streaming table.
 
-    :param name: Optional name of the flow. When None, defaults to the target 
name.
+    :param name: The name of the flow. The API layer defaults this to the 
target name when
+        the user does not supply one.
     :param target: The name of the target streaming table.
     :param source: The name of the CDC source to stream from.
     :param keys: Column(s) that uniquely identify a row in source and target 
data.
     :param sequence_by: Expression used to order the source data.
-    :param apply_as_deletes: Optional delete condition for the merged 
operation.
+    :param apply_as_deletes: Optional delete condition for the merge operation.
     :param column_list: Optional columns to include in the output table.
     :param except_column_list: Optional columns to exclude from the output 
table.
-    :param stored_as_scd_type: Optional SCD type for the target table. Only 1 
is supported.
+    :param stored_as_scd_type: Optional SCD type for the target table. Only 1 
(or "1") is \
+        supported.
     :param source_code_location: The location of the source code that created 
this flow.
     """
 
-    name: Optional[str]
+    name: str

Review Comment:
   This should still be optional.



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala:
##########
@@ -394,25 +391,6 @@ case class Scd1BatchProcessor(
       .insertAll()
       .merge()
   }
-
-  private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit 
= {
-    val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
-    val resolver = microbatchSqlConf.resolver
-
-    microbatch.schema.fieldNames
-      .find(resolver(_, Scd1BatchProcessor.cdcMetadataColName))
-      .foreach { conflictingColumnName =>
-        throw new AnalysisException(
-          errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT",
-          messageParameters = Map(
-            "caseSensitivity" -> 
CaseSensitivityLabels.of(microbatchSqlConf.caseSensitiveAnalysis),
-            "columnName" -> conflictingColumnName,
-            "schemaName" -> "microbatch",
-            "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName
-          )
-        )
-      }
-  }

Review Comment:
   Thanks, this was supposed to be removed in the other PR but I think it got 
accidentally added back as I rebased the stack!



##########
python/pyspark/pipelines/api.py:
##########
@@ -614,7 +617,21 @@ def create_auto_cdc_flow(
     if name is None:
         name = target
 
+    if column_list is not None and except_column_list is not None:
+        raise PySparkValueError(
+            errorClass="INVALID_MULTIPLE_ARGUMENT_CONDITIONS",
+            messageParameters={
+                "arg_names": "column_list, except_column_list",
+                "condition": "specified together",
+            },
+        )
+

Review Comment:
   I don't feel super strongly but I'm still of the opinion this validation 
should be done server side on flow registration, not client side, for 
consistency across all language interfaces (ex. SQL).
   
   Empty keys are already validated against on the server side, and I will be 
adding invalidation against specifying both `column_list` and 
`except_column_list` in the next PR that registers flows from the RPCs that are 
received.



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala:
##########
@@ -130,6 +130,9 @@ case class FlowFunctionResult(
 
 /** A [[Flow]] whose output schema and dependencies aren't known. */
 sealed trait UnresolvedFlow extends Flow {
+  /** Optional user-supplied comment attached to this flow at definition time. 
*/
+  def comment: Option[String]
+

Review Comment:
   Actually, can we go the opposite direction and drop `comment` from 
`AutoCdcFlow`.
   
   I remember that we intentionally dropped `comment` from all flows a while 
ago, and I accidentally added it to `AutoCdcFlow` when I first introduced the 
flow object. It shouldn't have it, and neither should any other 
`UnresolvedFlow`.
   
   https://github.com/apache/spark/pull/51406



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to