AnishMahto commented on code in PR #56113:
URL: https://github.com/apache/spark/pull/56113#discussion_r3305124702
##########
python/pyspark/pipelines/flow.py:
##########
@@ -51,19 +51,21 @@ class AutoCdcFlow:
An Auto CDC flow applies Change Data Capture (CDC) events from a source to
a target
streaming table.
- :param name: Optional name of the flow. When None, defaults to the target
name.
+ :param name: The name of the flow. The API layer defaults this to the
target name when
+ the user does not supply one.
:param target: The name of the target streaming table.
:param source: The name of the CDC source to stream from.
:param keys: Column(s) that uniquely identify a row in source and target
data.
:param sequence_by: Expression used to order the source data.
- :param apply_as_deletes: Optional delete condition for the merged
operation.
+ :param apply_as_deletes: Optional delete condition for the merge operation.
:param column_list: Optional columns to include in the output table.
:param except_column_list: Optional columns to exclude from the output
table.
- :param stored_as_scd_type: Optional SCD type for the target table. Only 1
is supported.
+ :param stored_as_scd_type: Optional SCD type for the target table. Only 1
(or "1") is \
+ supported.
:param source_code_location: The location of the source code that created
this flow.
"""
- name: Optional[str]
+ name: str
Review Comment:
This should still be optional.
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala:
##########
@@ -394,25 +391,6 @@ case class Scd1BatchProcessor(
.insertAll()
.merge()
}
-
- private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit
= {
- val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
- val resolver = microbatchSqlConf.resolver
-
- microbatch.schema.fieldNames
- .find(resolver(_, Scd1BatchProcessor.cdcMetadataColName))
- .foreach { conflictingColumnName =>
- throw new AnalysisException(
- errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT",
- messageParameters = Map(
- "caseSensitivity" ->
CaseSensitivityLabels.of(microbatchSqlConf.caseSensitiveAnalysis),
- "columnName" -> conflictingColumnName,
- "schemaName" -> "microbatch",
- "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName
- )
- )
- }
- }
Review Comment:
Thanks, this was supposed to be removed in the other PR but I think it got
accidentally added back as I rebased the stack!
##########
python/pyspark/pipelines/api.py:
##########
@@ -614,7 +617,21 @@ def create_auto_cdc_flow(
if name is None:
name = target
+ if column_list is not None and except_column_list is not None:
+ raise PySparkValueError(
+ errorClass="INVALID_MULTIPLE_ARGUMENT_CONDITIONS",
+ messageParameters={
+ "arg_names": "column_list, except_column_list",
+ "condition": "specified together",
+ },
+ )
+
Review Comment:
I don't feel super strongly but I'm still of the opinion this validation
should be done server side on flow registration, not client side, for
consistency across all language interfaces (ex. SQL).
Empty keys are already validated against on the server side, and I will be
adding invalidation against specifying both `column_list` and
`except_column_list` in the next PR that registers flows from the RPCs that are
received.
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala:
##########
@@ -130,6 +130,9 @@ case class FlowFunctionResult(
/** A [[Flow]] whose output schema and dependencies aren't known. */
sealed trait UnresolvedFlow extends Flow {
+ /** Optional user-supplied comment attached to this flow at definition time.
*/
+ def comment: Option[String]
+
Review Comment:
Actually, can we go the opposite direction and drop `comment` from
`AutoCdcFlow`.
I remember that we intentionally dropped `comment` from all flows a while
ago, and I accidentally added it to `AutoCdcFlow` when I first introduced the
flow object. It shouldn't have it, and neither should any other
`UnresolvedFlow`.
https://github.com/apache/spark/pull/51406
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]