gengliangwang commented on code in PR #54667:
URL: https://github.com/apache/spark/pull/54667#discussion_r2900014913


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala:
##########
@@ -862,6 +864,34 @@ case class CatalogStatistics(
     }
   }
 
+  /**
+   * Convert [[CatalogStatistics]] to v2 connector [[V2Statistics]], matching 
column stats to
+   * schema columns by name. All available statistics are returned 
unconditionally; callers are
+   * responsible for gating on CBO / planStats settings before using numRows 
or columnStats.
+   *
+   * @param schema combined data + partition schema, used to resolve the 
DataType for each column
+   *               when deserializing min/max string values
+   */
+  def toV2Stats(schema: StructType): V2Statistics = {
+    val typeMap = schema.fields.map(f => f.name -> f.dataType).toMap
+    val colStatsMap: Map[NamedReference, ColumnStatistics] = colStats.flatMap 
{ case (name, stat) =>
+      typeMap.get(name).map { dt =>
+        FieldReference.apply(name) -> stat.toV2ColStat(name, dt)

Review Comment:
   `FieldReference.apply(name)` runs through 
`CatalystSqlParser.parseMultipartIdentifier`, which splits dotted names and 
interprets backticks. For a column named `a.b`, this produces a multi-part 
reference `["a", "b"]` instead of `["a.b"]`. `DescribeColumnExec` uses 
`FieldReference.column(name)` to avoid this — consider doing the same here.



##########
sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala:
##########
@@ -425,4 +427,55 @@ abstract class StatisticsCollectionTestBase extends 
QueryTest with SQLTestUtils
       assert(relation.stats.attributeStats.isEmpty)
     }
   }
+
+  test("CatalogStatistics.toV2Stats") {
+    val schema = StructType(Seq(
+      StructField("id", IntegerType),
+      StructField("name", StringType)))
+
+    val idColStat = CatalogColumnStat(
+      distinctCount = Some(10),
+      min = Some("1"),
+      max = Some("100"),
+      nullCount = Some(0),
+      avgLen = Some(4),
+      maxLen = Some(4))
+
+    val catalogStats = CatalogStatistics(
+      sizeInBytes = 1024,
+      rowCount = Some(10),
+      colStats = Map(
+        "id" -> idColStat,
+        // "extra" is not in schema — should be silently skipped
+        "extra" -> CatalogColumnStat(distinctCount = Some(5))))
+
+    val v2Stats = catalogStats.toV2Stats(schema)
+
+    // sizeInBytes is always populated
+    assert(v2Stats.sizeInBytes().getAsLong === 1024L)
+
+    // numRows is present when rowCount is defined
+    assert(v2Stats.numRows().isPresent)
+    assert(v2Stats.numRows().getAsLong === 10L)
+
+    // only columns present in schema are returned; "extra" is skipped
+    val colStats = v2Stats.columnStats()
+    assert(colStats.size() === 1, "only 'id' is in schema; 'extra' should be 
skipped")
+    val idV2 = colStats.get(FieldReference.apply("id"))

Review Comment:
   The test looks up column stats via `FieldReference.apply("id")`. If the 
implementation switches to `FieldReference.column` (as suggested above), this 
would need to change too. Also worth adding a test case for histogram 
round-tripping.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala:
##########
@@ -936,6 +966,29 @@ case class CatalogColumnStat(
       maxLen = maxLen,
       histogram = histogram,
       version = version)
+
+  /**
+   * Convert [[CatalogColumnStat]] to a v2 connector [[ColumnStatistics]].
+   * min/max are deserialized from their external string representation using 
the column's DataType.
+   */
+  def toV2ColStat(colName: String, dataType: DataType): ColumnStatistics = {
+    val parsedMin = min.map(CatalogColumnStat.fromExternalString(_, colName, 
dataType, version))
+    val parsedMax = max.map(CatalogColumnStat.fromExternalString(_, colName, 
dataType, version))
+    val v2DistinctCount =
+      distinctCount.map(v => 
OptionalLong.of(v.longValue)).getOrElse(OptionalLong.empty())
+    val v2NullCount =
+      nullCount.map(v => 
OptionalLong.of(v.longValue)).getOrElse(OptionalLong.empty())
+    val v2AvgLen = avgLen.map(OptionalLong.of).getOrElse(OptionalLong.empty())
+    val v2MaxLen = maxLen.map(OptionalLong.of).getOrElse(OptionalLong.empty())
+    new ColumnStatistics {
+      override def distinctCount(): OptionalLong = v2DistinctCount
+      override def min(): Optional[Object] = 
Optional.ofNullable(parsedMin.orNull)
+      override def max(): Optional[Object] = 
Optional.ofNullable(parsedMax.orNull)
+      override def nullCount(): OptionalLong = v2NullCount
+      override def avgLen(): OptionalLong = v2AvgLen
+      override def maxLen(): OptionalLong = v2MaxLen

Review Comment:
   `histogram` is not converted. `CatalogColumnStat` has a `histogram` field, 
the V2 `ColumnStatistics` interface has a `histogram()` method, and 
`DataSourceV2Relation.transformV2Stats` reads it. Dropping it here means 
connectors that use this conversion silently lose histogram data. The sibling 
`toPlanStat` does propagate the histogram.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to