AnishMahto commented on code in PR #56283:
URL: https://github.com/apache/spark/pull/56283#discussion_r3350654993


##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala:
##########
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{functions => F}
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.catalyst.util.QuotingUtils
+import org.apache.spark.sql.classic.DataFrame
+import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
+import org.apache.spark.util.ArrayImplicits._
+
+/**
+ * Per-microbatch processor for SCD Type 2 AutoCDC flows, complying to the 
specified
+ * [[changeArgs]] configuration.
+ *
+ * @param changeArgs The CDC flow configuration.
+ * @param resolvedSequencingType The post-analysis [[DataType]] of the 
sequencing column, derived
+ *                               from the flow's resolved DataFrame at flow 
setup time.
+ */
+case class Scd2BatchProcessor(
+    changeArgs: ChangeArgs,
+    resolvedSequencingType: DataType) {
+
+  /**
+   * Backtick-quoted key column names. Use when the name flows through an 
expression parser
+   * (e.g., [[F.col]]), which interprets dotted names as struct-field accesses.
+   */
+  private lazy val keysQuoted: Seq[String] = changeArgs.keys.map(_.quoted)
+
+  /**
+   * Raw key column names. Use when the name is matched literally against a 
schema field
+   * (e.g., DataFrame `.join(other, usingColumns)`), where backticks are NOT 
stripped.
+   */
+  private lazy val keysRaw: Seq[String] = changeArgs.keys.map(_.name)
+
+  /**
+   * Reconcile a CDC microbatch into the canonical form the auxiliary- and 
target-table merges
+   * consume.
+   *
+   * Step ordering is load-bearing: the row-extension steps reference user 
data columns that
+   * target-column selection is allowed to drop, so selection runs last. 
Unlike SCD1, no per-key
+   * deduplication step is performed here - SCD2 preserves every event as part 
of the row's
+   * history, including byte-identical full-event duplicates.
+   *
+   * Duplicate event elimination (e.g., collapsing two identical events at the 
same sequence),
+   * whether across microbatches or within the same microbatch, is the 
responsibility of
+   * downstream reconciliation - not preprocessing.
+   *
+   * @param microbatchDf
+   *   the incoming CDC microbatch.
+   * @return
+   *   a dataframe that retains every input row 1:1 - no rows added, dropped, 
reordered, or
+   *   merged - with the following schema, in column order:
+   *     1. The user columns of `microbatchDf` that survive 
[[ChangeArgs.columnSelection]], in
+   *        the order they appeared in the input.
+   *     2. [[startAtColName]], populated with the sequence value of the row.
+   *     3. [[endAtColName]], populated with the sequence value of the row IFF 
it's a delete
+   *        event, null otherwise.
+   *     4. [[cdcMetadataColName]], conforming to 
[[targetCdcMetadataColSchema]].
+   */

Review Comment:
   An aside on my intentions with these scaladocs here and going forward:
   
   I want each individual, unit-testable, function in SCD2 microbatch processor 
to have a well defined contracts; it should be clear exactly what dataframe 
each function expects and what dataframe each function returns, including the 
schema and logical invariants of each.
   
   I'm a big believer in correctness by construction, I would love for there to 
be a way to encode this information into scala types which are both 
self-documenting and compile-time verifiable. 
   
   But we only actually know the user's key/data columns at runtime, so we 
can't leverage the typed `Dataset` API in a meaningful way here.
   
   At best we could introduce case-class wrappers around the `DataFrame` that 
communicate intent/invariants of the wrapped `DataFrame` via the name. But that 
can't enforce someone doesn't incorrectly construct a wrapped DataFrame (ex. 
one the violates the declared invariants), so it doesn't buy us much more than 
just documenting like this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to