cloud-fan commented on code in PR #55487:
URL: https://github.com/apache/spark/pull/55487#discussion_r3185852964


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Dependency.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.List;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Represents a dependency of a SQL object such as a view or metric view.
+ * <p>
+ * A dependency is one of: {@link TableDependency} or {@link 
FunctionDependency}. The
+ * {@code sealed} declaration enforces this structurally.
+ *
+ * @since 4.2.0
+ */
+@Evolving
+public sealed interface Dependency permits TableDependency, FunctionDependency 
{

Review Comment:
   `FunctionDependency` is in the sealed `permits` list and exposed via the 
`Dependency.function(...)` factory below, but no producer in this PR ever emits 
one -- `MetricViewHelper.collectTableDependencies` only emits 
`TableDependency`. Two options: (a) drop `FunctionDependency` until it has a 
producer (the `@Evolving` annotation is meant to evolve before stabilizing, so 
adding it later is cheap); (b) keep it as groundwork but mention in the PR 
description so reviewers don't trip on the dead surface, and add a sentence to 
this class-level Javadoc noting that consumers may receive only 
`TableDependency` instances today.



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableDependency.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * A table dependency of a SQL object.
+ * <p>
+ * The dependent table is identified by its structural multi-part name. {@code 
nameParts}
+ * arity matches the catalog's namespace depth plus one for the table name -- 
for a catalog
+ * with single-level namespaces the parts are {@code [catalog, schema, 
table]}; for a catalog
+ * with multi-level namespaces (e.g. Iceberg with {@code db1.db2}) the parts 
are
+ * {@code [catalog, db1, db2, ..., table]}; for v1 sources resolved through 
the session
+ * catalog producers should normalize to {@code [spark_catalog, db, table]} so 
consumers see

Review Comment:
   Missing comma -- as written, "session catalog producers" reads as a compound 
noun.
   
   ```suggestion
    * catalog, producers should normalize to {@code [spark_catalog, db, table]} 
so consumers see
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala:
##########
@@ -323,6 +323,35 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       CreateV2ViewExec(catalog.asInstanceOf[ViewCatalog], ident, 
userSpecifiedColumns, comment,
         collation, properties, sqlText, child, allowExisting, replace, 
viewSchemaMode) :: Nil
 
+    // CREATE VIEW ... WITH METRICS on a non-session v2 catalog. Routes the 
metric-view path
+    // through `CreateV2MetricViewExec`, which extends `V2ViewPreparation` to 
share the
+    // `IF NOT EXISTS` short-circuit, `OR REPLACE`, and cross-type-collision 
decoding with
+    // `CreateV2ViewExec`. Session-catalog dispatch stays in 
`CreateMetricViewCommand.run`.
+    case CreateMetricViewCommand(
+        ResolvedIdentifier(catalog, ident), userSpecifiedColumns, comment, 
properties,
+        originalText, allowExisting, replace) if 
!CatalogV2Util.isSessionCatalog(catalog) =>
+      val viewCatalog = catalog match {
+        case vc: ViewCatalog => vc
+        case _ => throw 
QueryCompilationErrors.missingCatalogViewsAbilityError(catalog)
+      }
+      // Parse + analyze the YAML body here (during planning). This mirrors 
the v1 path's
+      // late analysis in `CreateMetricViewCommand.run` -- the metric-view 
source plan is not
+      // a SQL string, so it can't ride along as a regular `query` 
`LogicalPlan` field on the
+      // logical command the way `CreateView` does. Pass the full multi-part 
name so v2 metric
+      // views with multi-level-namespace targets analyze correctly 
(`asTableIdentifier` would
+      // throw `requiresSinglePartNamespaceError` for namespace arity > 1).
+      val nameParts = (catalog.name() +: ident.namespace().toIndexedSeq) :+ 
ident.name()
+      val (analyzed, metricView) = MetricViewHelper.analyzeMetricViewText(
+        session, nameParts, originalText)
+      val mergedProps = properties ++ metricView.getProperties
+      val depParts = MetricViewHelper.collectTableDependencies(analyzed)
+      val deps = if (depParts.nonEmpty) {
+        Some(DependencyList.of(
+          depParts.map(parts => Dependency.table(parts: _*)): _*))
+      } else None

Review Comment:
   The empty-`depParts` case maps to `None` here, which becomes `null` on 
`ViewInfo.viewDependencies()`. Per the Javadoc on `ViewInfo#viewDependencies`, 
`null` means "no dependency list was supplied" while an empty list means 
"provided but the object has none." For a metric view we always *compute* deps, 
so when `collectTableDependencies` returns empty (e.g. `SQLSource("SELECT 1 AS 
x")`) the right value is `Some(DependencyList.of())`, not `None`.
   
   ```suggestion
         val deps = Some(DependencyList.of(
           depParts.map(parts => Dependency.table(parts: _*)): _*))
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala:
##########
@@ -0,0 +1,1031 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.catalyst.analysis.{NoSuchViewException, 
ViewAlreadyExistsException}
+import org.apache.spark.sql.connector.catalog.{Identifier, 
InMemoryTableCatalog, MetadataTable, Table, TableCatalog, TableDependency, 
TableSummary, TableViewCatalog, ViewInfo}
+import org.apache.spark.sql.metricview.serde.{AssetSource, Column, Constants, 
DimensionExpression, MeasureExpression, MetricView, MetricViewFactory, 
SQLSource}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.Metadata
+
+/**
+ * Tests that exercise 
[[org.apache.spark.sql.execution.command.CreateMetricViewCommand]] on a
+ * non-session V2 catalog. Metric views are persisted through the same 
[[ViewCatalog]] interface
+ * as plain views; the only marker that distinguishes them is `PROP_TABLE_TYPE 
= METRIC_VIEW`
+ * plus the typed `viewDependencies` field on [[ViewInfo]]. The recording 
catalog used here is a
+ * minimal [[TableViewCatalog]] so the same instance can also host the source 
table referenced by
+ * the metric view's YAML.
+ */
+class MetricViewV2CatalogSuite extends QueryTest with SharedSparkSession {
+
+  import testImplicits._
+
+  private val testCatalogName = "testcat"
+  private val testNamespace = "ns"
+  private val sourceTableName = "events"
+  private val fullSourceTableName =
+    s"$testCatalogName.$testNamespace.$sourceTableName"
+  private val metricViewName = "mv"
+  private val fullMetricViewName =
+    s"$testCatalogName.$testNamespace.$metricViewName"
+
+  private val metricViewColumns = Seq(
+    Column("region", DimensionExpression("region"), 0),
+    Column("count_sum", MeasureExpression("sum(count)"), 1))
+
+  private val testTableData = Seq(
+    ("region_1", 1, 5.0),
+    ("region_2", 2, 10.0))
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    spark.conf.set(
+      s"spark.sql.catalog.$testCatalogName",
+      classOf[MetricViewRecordingCatalog].getName)
+    // A catalog that does not implement ViewCatalog - used for the negative 
gate test.
+    spark.conf.set(
+      s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}",
+      classOf[InMemoryTableCatalog].getName)
+  }
+
+  override protected def afterAll(): Unit = {
+    spark.conf.unset(s"spark.sql.catalog.$testCatalogName")
+    spark.conf.unset(
+      s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}")
+    super.afterAll()
+  }
+
+  private def withTestCatalogTables(body: => Unit): Unit = {
+    MetricViewRecordingCatalog.reset()
+    testTableData.toDF("region", "count", "price")
+      .createOrReplaceTempView("metric_view_v2_source")
+    try {
+      sql(
+        s"""CREATE TABLE $fullSourceTableName
+           |USING foo AS SELECT * FROM metric_view_v2_source""".stripMargin)
+      body
+    } finally {
+      // The metric-view ident `mv` may have ended up as either a view (most 
tests) or as a
+      // pre-created table (a few negative tests pre-create a table at the 
same ident to
+      // exercise cross-type collisions). Sweep both kinds so subsequent tests 
in the suite
+      // start from a clean catalog state. Wrap each DROP in a Try because:
+      //   - DROP VIEW IF EXISTS on a leftover *table* throws 
WRONG_COMMAND_FOR_OBJECT_TYPE
+      //     under master's new DropViewExec active-rejection contract.
+      //   - DROP TABLE IF EXISTS on a leftover *view* throws the symmetric 
error.
+      //   - On a totally clean state both are silent no-ops.
+      scala.util.Try(sql(s"DROP VIEW IF EXISTS $fullMetricViewName"))
+      scala.util.Try(sql(s"DROP TABLE IF EXISTS $fullMetricViewName"))
+      scala.util.Try(sql(s"DROP TABLE IF EXISTS $fullSourceTableName"))
+      spark.catalog.dropTempView("metric_view_v2_source")
+      MetricViewRecordingCatalog.reset()
+    }
+  }
+
+  private def createMetricView(
+      name: String,
+      metricView: MetricView,
+      comment: Option[String] = None): String = {
+    val yaml = MetricViewFactory.toYAML(metricView)
+    val commentClause = comment.map(c => s"\nCOMMENT '$c'").getOrElse("")
+    sql(
+      s"""CREATE VIEW $name
+         |WITH METRICS$commentClause
+         |LANGUAGE YAML
+         |AS
+         |$$$$
+         |$yaml
+         |$$$$""".stripMargin)
+    yaml
+  }
+
+  private def capturedViewInfo(): ViewInfo = {
+    val ident = Identifier.of(Array(testNamespace), metricViewName)
+    val info = MetricViewRecordingCatalog.capturedViews.get(ident)
+    assert(info != null,
+      s"Expected ViewInfo for $ident to be captured by the V2 catalog")
+    info
+  }
+
+  // ============================================================
+  // Section 1: CREATE-related tests
+  // ============================================================
+
+
+  test("V2 catalog receives METRIC_VIEW table type and view text via 
ViewInfo") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      val yaml = createMetricView(fullMetricViewName, metricView)
+
+      val info = capturedViewInfo()
+      // PROP_TABLE_TYPE is overwritten to METRIC_VIEW after `ViewInfo`'s 
constructor stamps it
+      // to VIEW; this is the marker `V1Table.toCatalogTable` reads to map the 
round-tripped row
+      // back to `CatalogTableType.METRIC_VIEW`.
+      assert(info.properties().get(TableCatalog.PROP_TABLE_TYPE)
+        === TableSummary.METRIC_VIEW_TABLE_TYPE)
+      // The captured queryText is the raw text between `$$ ... $$` -- 
including the leading
+      // and trailing newline our SQL fixture inserts -- so trim before 
comparing to the
+      // pre-substitution YAML body.
+      assert(info.queryText().trim === yaml.trim)
+
+      val deps = info.viewDependencies()
+      assert(deps != null)
+      assert(deps.dependencies().size() === 1)
+      val tableDep = deps.dependencies().get(0).asInstanceOf[TableDependency]
+      assert(tableDep.nameParts().asScala.toSeq ===
+        Seq(testCatalogName, testNamespace, sourceTableName))
+    }
+  }
+
+  test("V2 catalog path populates metric_view.* + view context + sql configs 
on ViewInfo") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = Some("count > 0"),
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, metricView)
+
+      val info = capturedViewInfo()
+      val props = info.properties()
+
+      // metric_view.* descriptive properties (mirrors DBR 
SingleSourceMetricView).

Review Comment:
   "DBR" is internal Databricks-Runtime terminology and shouldn't appear in OSS 
comments.
   
   ```suggestion
         // metric_view.* descriptive properties (mirrors the canonical 
metric-view property layout).
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala:
##########
@@ -73,25 +94,110 @@ case class CreateMetricViewCommand(
 case class AlterMetricViewCommand(child: LogicalPlan, originalText: String)
 
 object MetricViewHelper {
+
+  /**
+   * Walks the analyzed plan to collect direct table/view dependencies. Each 
dependency is
+   * returned as a structural multi-part name (`Seq[String]`); v1 sources 
(resolved through
+   * the session catalog) are normalized to a stable 3-part shape
+   * `[spark_catalog, db, table]` -- `TableIdentifier.nameParts` returns 1, 2, 
or 3 parts
+   * depending on whether the analyzer captured the catalog / database, so 
without
+   * normalization the same source can produce a different shape across runs. 
v2 sources
+   * already arrive fully qualified (catalog + namespace + table) and are 
returned as-is so
+   * multi-level namespaces survive.
+   *
+   * Stops recursion at relation leaf nodes and persistent `View` nodes so 
only direct
+   * (not transitive) dependencies are recorded.
+   */
+  private[execution] def collectTableDependencies(plan: LogicalPlan): 
Seq[Seq[String]] = {
+    val tables = scala.collection.mutable.ArrayBuffer.empty[Seq[String]]
+    def traverse(p: LogicalPlan): Unit = p match {
+      case v: View if !v.isTempView =>
+        tables += qualifyV1(v.desc.identifier.nameParts)
+      case r: DataSourceV2Relation if r.catalog.isDefined && 
r.identifier.isDefined =>
+        val ident = r.identifier.get
+        // V2 catalogs may have multi-level namespaces; preserve the full 
arity rather than
+        // dot-joining the namespace into a single component.
+        tables += (r.catalog.get.name() +: ident.namespace().toIndexedSeq) :+ 
ident.name()
+      case r: HiveTableRelation =>
+        tables += qualifyV1(r.tableMeta.identifier.nameParts)
+      case r: LogicalRelation if r.catalogTable.isDefined =>
+        tables += qualifyV1(r.catalogTable.get.identifier.nameParts)
+      case other =>
+        other.children.foreach(traverse)
+        other.expressions.foreach(_.foreach {
+          case s: SubqueryExpression => traverse(s.plan)
+          case _ =>
+        })
+    }
+    traverse(plan)
+    tables.distinct.toSeq
+  }
+
+  /**
+   * Normalizes v1 source identifiers to a stable 3-part `[spark_catalog, db, 
table]` shape.
+   * `TableIdentifier.nameParts` may return 1, 2, or 3 parts depending on 
whether the analyzer
+   * captured the catalog / database components, which would otherwise leak 
through to
+   * dependency consumers as nondeterministic arity.
+   */
+  private def qualifyV1(parts: Seq[String]): Seq[String] = parts match {
+    case Seq(t) => Seq(SESSION_CATALOG_NAME, SessionCatalog.DEFAULT_DATABASE, 
t)
+    case Seq(db, t) => Seq(SESSION_CATALOG_NAME, db, t)
+    case Seq(_, _, _) => parts
+    case other => other  // Unexpected arity; pass through unchanged.
+  }
+
+  /**
+   * Analyzes a metric-view YAML body so the create / alter path can capture 
the source plan
+   * and its dependencies. Returns the analyzed plan together with the parsed 
[[MetricView]]
+   * descriptor (the latter is grabbed off the un-analyzed 
[[MetricViewPlaceholder]] before
+   * the analyzer rewrites it away, so callers needing the descriptor for 
property emission
+   * don't have to re-parse the YAML).
+   *
+   * `nameParts` is the multi-part target identifier (catalog + namespace + 
table). The synthetic
+   * [[CatalogTable]] used as analysis context still carries a 
[[TableIdentifier]] (capped at
+   * 3 parts: catalog + database + table); for multi-level v2 namespaces we 
collapse the
+   * intermediate namespace components into the synthetic `database` slot. The 
synthetic identifier
+   * is not used to resolve the view body itself, so this collapse is 
observationally invisible to
+   * the analyzed plan; `verifyTemporaryObjectsNotExists` continues to receive 
the full
+   * `nameParts` so error messages still render the multi-part form.
+   */
   def analyzeMetricViewText(
       session: SparkSession,
-      name: TableIdentifier,
-      viewText: String): LogicalPlan = {
+      nameParts: Seq[String],
+      viewText: String): (LogicalPlan, MetricView) = {
     val analyzer = session.sessionState.analyzer
+    val syntheticIdent = nameParts match {
+      case Seq(table) =>
+        TableIdentifier(table)
+      case Seq(db, table) =>
+        TableIdentifier(table, Some(db))
+      case parts =>
+        // 3+ parts: catalog is the head, table is the last, the middle 
(1..n-1) collapses
+        // into the synthetic `database` slot. We dot-join the intermediate 
components so a
+        // human inspecting the synthetic identifier can still see them.
+        TableIdentifier(
+          parts.last,
+          Some(parts.slice(1, parts.length - 1).mkString(".")),
+          Some(parts.head))
+    }
     // this metadata is used for analysis check, it'll be replaced during 
create/update with
     // more accurate information
     val tableMeta = CatalogTable(
-      identifier = name,
+      identifier = syntheticIdent,
       tableType = CatalogTableType.VIEW,
       storage = CatalogStorageFormat.empty,
       schema = new StructType(),
       viewOriginalText = Some(viewText),
       viewText = Some(viewText))
-    val metricViewNode = MetricViewPlanner.planWrite(
+    val placeholder = MetricViewPlanner.planWrite(
       tableMeta, viewText, session.sessionState.sqlParser)
-    val analyzed = analyzer.executeAndCheck(metricViewNode, new 
QueryPlanningTracker)
+    // Grab the parsed descriptor BEFORE analysis -- the placeholder gets 
replaced by
+    // ResolvedMetricView during analyzer rules, after which `MetricView` is 
no longer
+    // recoverable from the plan tree.
+    val metricView = placeholder.desc
+    val analyzed = analyzer.executeAndCheck(placeholder, new 
QueryPlanningTracker)
     ViewHelper.verifyTemporaryObjectsNotExists(
-      isTemporary = false, name.nameParts, analyzed, Seq.empty)
-    analyzed
+      isTemporary = false, nameParts, analyzed, Seq.empty)

Review Comment:
   `analyzeMetricViewText` runs `verifyTemporaryObjectsNotExists` but skips the 
cyclic-view-reference check that plain `CREATE VIEW` gets via 
`CheckViewReferences.checkCyclicViewReference` (only applied on `replace`). For 
`CREATE OR REPLACE VIEW <mv> WITH METRICS LANGUAGE YAML AS $$ from: <mv> ... 
$$` the cycle currently surfaces only on the next `SELECT`. Pre-existing on the 
v1 path too (the old `createMetricViewInSessionCatalog` didn't call it either), 
so this is a follow-up rather than a blocker for this PR -- but wiring it in 
here means both v1 and v2 metric-view CREATE paths benefit from one fix.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala:
##########
@@ -0,0 +1,1031 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.catalyst.analysis.{NoSuchViewException, 
ViewAlreadyExistsException}
+import org.apache.spark.sql.connector.catalog.{Identifier, 
InMemoryTableCatalog, MetadataTable, Table, TableCatalog, TableDependency, 
TableSummary, TableViewCatalog, ViewInfo}
+import org.apache.spark.sql.metricview.serde.{AssetSource, Column, Constants, 
DimensionExpression, MeasureExpression, MetricView, MetricViewFactory, 
SQLSource}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.Metadata
+
+/**
+ * Tests that exercise 
[[org.apache.spark.sql.execution.command.CreateMetricViewCommand]] on a
+ * non-session V2 catalog. Metric views are persisted through the same 
[[ViewCatalog]] interface
+ * as plain views; the only marker that distinguishes them is `PROP_TABLE_TYPE 
= METRIC_VIEW`
+ * plus the typed `viewDependencies` field on [[ViewInfo]]. The recording 
catalog used here is a
+ * minimal [[TableViewCatalog]] so the same instance can also host the source 
table referenced by
+ * the metric view's YAML.
+ */
+class MetricViewV2CatalogSuite extends QueryTest with SharedSparkSession {
+
+  import testImplicits._
+
+  private val testCatalogName = "testcat"
+  private val testNamespace = "ns"
+  private val sourceTableName = "events"
+  private val fullSourceTableName =
+    s"$testCatalogName.$testNamespace.$sourceTableName"
+  private val metricViewName = "mv"
+  private val fullMetricViewName =
+    s"$testCatalogName.$testNamespace.$metricViewName"
+
+  private val metricViewColumns = Seq(
+    Column("region", DimensionExpression("region"), 0),
+    Column("count_sum", MeasureExpression("sum(count)"), 1))
+
+  private val testTableData = Seq(
+    ("region_1", 1, 5.0),
+    ("region_2", 2, 10.0))
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    spark.conf.set(
+      s"spark.sql.catalog.$testCatalogName",
+      classOf[MetricViewRecordingCatalog].getName)
+    // A catalog that does not implement ViewCatalog - used for the negative 
gate test.
+    spark.conf.set(
+      s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}",
+      classOf[InMemoryTableCatalog].getName)
+  }
+
+  override protected def afterAll(): Unit = {
+    spark.conf.unset(s"spark.sql.catalog.$testCatalogName")
+    spark.conf.unset(
+      s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}")
+    super.afterAll()
+  }
+
+  private def withTestCatalogTables(body: => Unit): Unit = {
+    MetricViewRecordingCatalog.reset()
+    testTableData.toDF("region", "count", "price")
+      .createOrReplaceTempView("metric_view_v2_source")
+    try {
+      sql(
+        s"""CREATE TABLE $fullSourceTableName
+           |USING foo AS SELECT * FROM metric_view_v2_source""".stripMargin)
+      body
+    } finally {
+      // The metric-view ident `mv` may have ended up as either a view (most 
tests) or as a
+      // pre-created table (a few negative tests pre-create a table at the 
same ident to
+      // exercise cross-type collisions). Sweep both kinds so subsequent tests 
in the suite
+      // start from a clean catalog state. Wrap each DROP in a Try because:
+      //   - DROP VIEW IF EXISTS on a leftover *table* throws 
WRONG_COMMAND_FOR_OBJECT_TYPE
+      //     under master's new DropViewExec active-rejection contract.
+      //   - DROP TABLE IF EXISTS on a leftover *view* throws the symmetric 
error.
+      //   - On a totally clean state both are silent no-ops.
+      scala.util.Try(sql(s"DROP VIEW IF EXISTS $fullMetricViewName"))
+      scala.util.Try(sql(s"DROP TABLE IF EXISTS $fullMetricViewName"))
+      scala.util.Try(sql(s"DROP TABLE IF EXISTS $fullSourceTableName"))
+      spark.catalog.dropTempView("metric_view_v2_source")
+      MetricViewRecordingCatalog.reset()
+    }
+  }
+
+  private def createMetricView(
+      name: String,
+      metricView: MetricView,
+      comment: Option[String] = None): String = {
+    val yaml = MetricViewFactory.toYAML(metricView)
+    val commentClause = comment.map(c => s"\nCOMMENT '$c'").getOrElse("")
+    sql(
+      s"""CREATE VIEW $name
+         |WITH METRICS$commentClause
+         |LANGUAGE YAML
+         |AS
+         |$$$$
+         |$yaml
+         |$$$$""".stripMargin)
+    yaml
+  }
+
+  private def capturedViewInfo(): ViewInfo = {
+    val ident = Identifier.of(Array(testNamespace), metricViewName)
+    val info = MetricViewRecordingCatalog.capturedViews.get(ident)
+    assert(info != null,
+      s"Expected ViewInfo for $ident to be captured by the V2 catalog")
+    info
+  }
+
+  // ============================================================
+  // Section 1: CREATE-related tests
+  // ============================================================
+
+
+  test("V2 catalog receives METRIC_VIEW table type and view text via 
ViewInfo") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      val yaml = createMetricView(fullMetricViewName, metricView)
+
+      val info = capturedViewInfo()
+      // PROP_TABLE_TYPE is overwritten to METRIC_VIEW after `ViewInfo`'s 
constructor stamps it
+      // to VIEW; this is the marker `V1Table.toCatalogTable` reads to map the 
round-tripped row
+      // back to `CatalogTableType.METRIC_VIEW`.
+      assert(info.properties().get(TableCatalog.PROP_TABLE_TYPE)
+        === TableSummary.METRIC_VIEW_TABLE_TYPE)
+      // The captured queryText is the raw text between `$$ ... $$` -- 
including the leading
+      // and trailing newline our SQL fixture inserts -- so trim before 
comparing to the
+      // pre-substitution YAML body.
+      assert(info.queryText().trim === yaml.trim)
+
+      val deps = info.viewDependencies()
+      assert(deps != null)
+      assert(deps.dependencies().size() === 1)
+      val tableDep = deps.dependencies().get(0).asInstanceOf[TableDependency]
+      assert(tableDep.nameParts().asScala.toSeq ===
+        Seq(testCatalogName, testNamespace, sourceTableName))
+    }
+  }
+
+  test("V2 catalog path populates metric_view.* + view context + sql configs 
on ViewInfo") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = Some("count > 0"),
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, metricView)
+
+      val info = capturedViewInfo()
+      val props = info.properties()
+
+      // metric_view.* descriptive properties (mirrors DBR 
SingleSourceMetricView).
+      assert(props.get(MetricView.PROP_FROM_TYPE) === "ASSET")
+      assert(props.get(MetricView.PROP_FROM_NAME) === fullSourceTableName)
+      assert(props.get(MetricView.PROP_FROM_SQL) === null)
+      assert(props.get(MetricView.PROP_WHERE) === "count > 0")
+
+      // SQL configs and current catalog/namespace are first-class typed 
fields on ViewInfo, no
+      // longer encoded into properties for V2 catalogs.
+      assert(info.sqlConfigs().size > 0,
+        s"Expected at least one captured SQL config; got ${info.sqlConfigs()}")
+      assert(info.currentCatalog() ===
+        spark.sessionState.catalogManager.currentCatalog.name())
+      assert(info.currentNamespace().toSeq ===
+        spark.sessionState.catalogManager.currentNamespace.toSeq)
+    }
+  }
+
+  test("V2 catalog path captures SQL source and comment") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        SQLSource(s"SELECT * FROM $fullSourceTableName"),
+        where = None,
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, metricView, comment = Some("my mv"))
+
+      val info = capturedViewInfo()
+      val props = info.properties()
+      assert(props.get(TableCatalog.PROP_TABLE_TYPE)
+        === TableSummary.METRIC_VIEW_TABLE_TYPE)
+      assert(props.get(MetricView.PROP_FROM_TYPE) === "SQL")
+      assert(props.get(MetricView.PROP_FROM_NAME) === null)
+      assert(props.get(MetricView.PROP_FROM_SQL) ===
+        s"SELECT * FROM $fullSourceTableName")
+      assert(props.get(TableCatalog.PROP_COMMENT) === "my mv")
+
+      val deps = info.viewDependencies()
+      assert(deps != null && deps.dependencies().size() === 1)
+      val tableDep = deps.dependencies().get(0).asInstanceOf[TableDependency]
+      assert(tableDep.nameParts().asScala.toSeq ===
+        Seq(testCatalogName, testNamespace, sourceTableName))
+    }
+  }
+
+  test("metric view columns carry metric_view.type / metric_view.expr in 
column metadata") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, metricView)
+
+      val cols = capturedViewInfo().columns()
+      assert(cols.length === metricViewColumns.length)
+
+      val byName = cols.map(c => c.name() -> c).toMap
+      def metadataOf(name: String): Metadata =
+        
Metadata.fromJson(Option(byName(name).metadataInJSON()).getOrElse("{}"))
+
+      val regionMeta = metadataOf("region")
+      assert(regionMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === 
"dimension")
+      assert(regionMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === 
"region")
+
+      val countMeta = metadataOf("count_sum")
+      assert(countMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === 
"measure")
+      assert(countMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === 
"sum(count)")
+    }
+  }
+
+  test("user-specified column names with comments preserve metric_view.* 
metadata") {
+    withTestCatalogTables {
+      val metricView = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      val yaml = MetricViewFactory.toYAML(metricView)
+      // Pins aliasPlan(retainMetadata = true): metric_view.* keys must 
survive a column
+      // rename with comments.
+      sql(
+        s"""CREATE VIEW $fullMetricViewName (reg COMMENT 'region alias', n 
COMMENT 'count')
+           |WITH METRICS
+           |LANGUAGE YAML
+           |AS
+           |$$$$
+           |$yaml
+           |$$$$""".stripMargin)
+
+      val cols = capturedViewInfo().columns()
+      val byName = cols.map(c => c.name() -> c).toMap
+      assert(byName.keySet === Set("reg", "n"))
+
+      def metadataOf(name: String): Metadata =
+        
Metadata.fromJson(Option(byName(name).metadataInJSON()).getOrElse("{}"))
+
+      val regMeta = metadataOf("reg")
+      assert(regMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === 
"dimension")
+      assert(regMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === 
"region")
+      // `CatalogV2Util.structTypeToV2Columns` peels "comment" off into 
`Column.comment()`
+      // rather than leaving it inside `metadataInJSON`; assert via the V2 
column accessor.
+      assert(byName("reg").comment() === "region alias")
+
+      val nMeta = metadataOf("n")
+      assert(nMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === "measure")
+      assert(nMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === 
"sum(count)")
+      assert(byName("n").comment() === "count")
+    }
+  }
+
+  test("CREATE OR REPLACE VIEW ... WITH METRICS replaces an existing v2 metric 
view") {
+    withTestCatalogTables {
+      val first = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = Some("count > 0"),
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, first)
+
+      // Replace with a new body (different WHERE clause).
+      val replacement = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = Some("count > 100"),
+        select = metricViewColumns)
+      val replacementYaml = MetricViewFactory.toYAML(replacement)
+      sql(
+        s"""CREATE OR REPLACE VIEW $fullMetricViewName
+           |WITH METRICS
+           |LANGUAGE YAML
+           |AS
+           |$$$$
+           |$replacementYaml
+           |$$$$""".stripMargin)
+
+      val finalInfo = capturedViewInfo()
+      // Assert on the distinguishing fields of the replacement, not on diff 
vs. the original.
+      // queryText keeps the surrounding `\n` from the SQL `$$ ... $$` 
markers; trim first.
+      assert(finalInfo.queryText().trim === replacementYaml.trim)
+      assert(finalInfo.properties().get(MetricView.PROP_WHERE) === "count > 
100")
+      val deps = finalInfo.viewDependencies()
+      assert(deps != null && deps.dependencies().size() === 1)
+      val tableDep = deps.dependencies().get(0).asInstanceOf[TableDependency]
+      assert(tableDep.nameParts().asScala.toSeq ===
+        Seq(testCatalogName, testNamespace, sourceTableName))
+    }
+  }
+
+  test("CREATE VIEW IF NOT EXISTS ... WITH METRICS is a no-op when the view 
exists") {
+    withTestCatalogTables {
+      val original = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, original)
+      val originalYaml = capturedViewInfo().queryText()
+
+      // Now CREATE VIEW IF NOT EXISTS with a different YAML body. The catalog 
should not see
+      // the second create at all (V2ViewPreparation's `viewExists` 
short-circuit fires before
+      // `buildViewInfo`), so the captured ViewInfo retains the original body.
+      val replacement = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = Some("count > 999"),
+        select = metricViewColumns)
+      val replacementYaml = MetricViewFactory.toYAML(replacement)
+      sql(
+        s"""CREATE VIEW IF NOT EXISTS $fullMetricViewName
+           |WITH METRICS
+           |LANGUAGE YAML
+           |AS
+           |$$$$
+           |$replacementYaml
+           |$$$$""".stripMargin)
+
+      assert(capturedViewInfo().queryText().trim === originalYaml.trim,
+        "IF NOT EXISTS over an existing metric view should be a no-op.")
+    }
+  }
+
+  test("CREATE VIEW ... WITH METRICS over a v2 table at the ident throws " +
+      "EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE") {
+    withTestCatalogTables {
+      // Pre-create a regular v2 table at the same ident the metric view will 
target. The
+      // catalog's `createView` call below should raise 
`ViewAlreadyExistsException`, which
+      // `CreateV2MetricViewExec` then decodes (via `tableExists`) into the 
precise cross-type
+      // collision error that `CreateV2ViewExec` emits.
+      sql(s"CREATE TABLE $fullMetricViewName (x INT) USING foo")
+
+      val mv = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      val yaml = MetricViewFactory.toYAML(mv)
+      val ex = intercept[AnalysisException] {
+        sql(
+          s"""CREATE VIEW $fullMetricViewName
+             |WITH METRICS
+             |LANGUAGE YAML
+             |AS
+             |$$$$
+             |$yaml
+             |$$$$""".stripMargin)
+      }
+      // SPARK-56655 added an analyzer-time pre-check for "ident already 
occupied by a table"
+      // before the v2 view-create exec runs, so the more specific
+      // `EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE` decoded by 
`CreateV2MetricViewExec.run`'s catch
+      // block is no longer reachable when a *plain* table sits at the ident 
-- the analyzer
+      // raises `TABLE_OR_VIEW_ALREADY_EXISTS` first. Both errors carry the 
same actionable
+      // signal ("can't create a view here because something else already 
lives at this ident").
+      assert(ex.getCondition === "TABLE_OR_VIEW_ALREADY_EXISTS",
+        s"Expected TABLE_OR_VIEW_ALREADY_EXISTS, got ${ex.getCondition}: 
${ex.getMessage}")
+    }
+  }
+
+  test("CREATE VIEW IF NOT EXISTS ... WITH METRICS is a no-op when a v2 table 
sits at the " +
+      "ident") {
+    withTestCatalogTables {
+      sql(s"CREATE TABLE $fullMetricViewName (x INT) USING foo")
+      val mv = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      val yaml = MetricViewFactory.toYAML(mv)
+      // IF NOT EXISTS over a table is a no-op (v1 parity), not an error.
+      sql(
+        s"""CREATE VIEW IF NOT EXISTS $fullMetricViewName
+           |WITH METRICS
+           |LANGUAGE YAML
+           |AS
+           |$$$$
+           |$yaml
+           |$$$$""".stripMargin)
+      val ident = Identifier.of(Array(testNamespace), metricViewName)
+      assert(!MetricViewRecordingCatalog.capturedViews.containsKey(ident),
+        "IF NOT EXISTS over a v2 table should not register a view in the 
catalog.")
+    }
+  }
+
+  test("CREATE VIEW ... WITH METRICS on a non-ViewCatalog catalog fails with " 
+
+      "MISSING_CATALOG_ABILITY.VIEWS") {
+    val ex = intercept[AnalysisException] {
+      sql(
+        s"""CREATE VIEW 
${MetricViewV2CatalogSuite.noViewCatalogName}.default.mv
+           |WITH METRICS
+           |LANGUAGE YAML
+           |AS
+           |$$$$
+           |${MetricViewFactory.toYAML(MetricView(
+              "0.1",
+              AssetSource(fullSourceTableName),
+              where = None,
+              select = metricViewColumns))}
+           |$$$$""".stripMargin)
+    }
+    // SPARK-56655 added the `.VIEWS` subclass; the bare 
`MISSING_CATALOG_ABILITY` no longer
+    // surfaces directly for the missing-view-ability case.
+    assert(ex.getCondition === "MISSING_CATALOG_ABILITY.VIEWS")
+    assert(ex.getMessage.contains("VIEWS"))
+  }
+
+  test("CREATE VIEW ... WITH METRICS at a multi-level-namespace v2 target 
succeeds") {
+    val deepNamespace = Array("ns_a", "ns_b")
+    val deepMetricViewName = "mv_deep"
+    val fullDeepName =
+      s"$testCatalogName.${deepNamespace.mkString(".")}.$deepMetricViewName"
+    withTestCatalogTables {
+      // Pre-create the multi-level namespace + a source table inside it. The 
metric view
+      // *target* lives in the same multi-level namespace -- that's what 
exercises the
+      // `MetricViewHelper.analyzeMetricViewText` lift to multi-part 
nameParts. The pre-lift
+      // code path failed at `ident.asTableIdentifier` with 
`requiresSinglePartNamespaceError`.
+      sql(s"CREATE NAMESPACE IF NOT EXISTS 
$testCatalogName.${deepNamespace.head}")
+      sql(s"CREATE NAMESPACE IF NOT EXISTS " +
+        s"$testCatalogName.${deepNamespace.mkString(".")}")
+      try {
+        val mv = MetricView(
+          "0.1",
+          AssetSource(fullSourceTableName),
+          where = None,
+          select = metricViewColumns)
+        val yaml = MetricViewFactory.toYAML(mv)
+        sql(
+          s"""CREATE VIEW $fullDeepName
+             |WITH METRICS
+             |LANGUAGE YAML
+             |AS
+             |$$$$
+             |$yaml
+             |$$$$""".stripMargin)
+
+        val deepIdent = Identifier.of(deepNamespace, deepMetricViewName)
+        val info = MetricViewRecordingCatalog.capturedViews.get(deepIdent)
+        assert(info != null, s"Expected ViewInfo for $deepIdent to be 
captured")
+        assert(info.properties().get(TableCatalog.PROP_TABLE_TYPE)
+          === TableSummary.METRIC_VIEW_TABLE_TYPE)
+      } finally {
+        scala.util.Try(sql(s"DROP VIEW IF EXISTS $fullDeepName"))
+        sql(s"DROP NAMESPACE IF EXISTS " +
+          s"$testCatalogName.${deepNamespace.mkString(".")} CASCADE")
+        sql(s"DROP NAMESPACE IF EXISTS $testCatalogName.${deepNamespace.head} 
CASCADE")
+      }
+    }
+  }
+
+  // ============================================================
+  // Section 2: Dependency extraction
+  // ============================================================
+
+
+  test("dependency extraction: SQL source JOIN captures both tables") {
+    withTestCatalogTables {
+      val secondSource = s"$testCatalogName.$testNamespace.customers"
+      sql(
+        s"""CREATE TABLE $secondSource (id INT, name STRING)
+           |USING foo""".stripMargin)
+      try {
+        val joinSql =
+          s"SELECT c.name, t.count FROM $fullSourceTableName t " +
+            s"JOIN $secondSource c ON t.count = c.id"
+        val metricView = MetricView(
+          "0.1",
+          SQLSource(joinSql),
+          where = None,
+          select = Seq(
+            Column("name", DimensionExpression("name"), 0),
+            Column("count_sum", MeasureExpression("sum(count)"), 1)))
+        createMetricView(fullMetricViewName, metricView)
+
+        val deps = capturedViewInfo().viewDependencies()
+        assert(deps != null)
+        val depParts = deps.dependencies().asScala
+          .map(_.asInstanceOf[TableDependency].nameParts().asScala.toSeq).toSet
+        assert(depParts === Set(
+          Seq(testCatalogName, testNamespace, sourceTableName),
+          Seq(testCatalogName, testNamespace, "customers")),
+          s"Expected dependencies on both source tables, got $depParts")
+      } finally {
+        sql(s"DROP TABLE IF EXISTS $secondSource")
+      }
+    }
+  }
+
+  test("dependency extraction: SQL source subquery deduplicates same-table 
references") {
+    withTestCatalogTables {
+      val subquerySql =
+        s"SELECT * FROM $fullSourceTableName " +
+          s"WHERE count > (SELECT avg(count) FROM $fullSourceTableName)"
+      val metricView = MetricView(
+        "0.1",
+        SQLSource(subquerySql),
+        where = None,
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, metricView)
+
+      val deps = capturedViewInfo().viewDependencies()
+      assert(deps != null && deps.dependencies().size() === 1,
+        s"Expected 1 deduplicated dependency, got " +
+          s"${Option(deps).map(_.dependencies().size()).getOrElse(0)}")
+      val tableDep = deps.dependencies().get(0).asInstanceOf[TableDependency]
+      assert(tableDep.nameParts().asScala.toSeq ===
+        Seq(testCatalogName, testNamespace, sourceTableName))
+    }
+  }
+
+  test("dependency extraction: SQL source self-join deduplicates same-table 
references") {
+    withTestCatalogTables {
+      val selfJoinSql =
+        s"SELECT a.region AS a_region, a.count AS a_count " +
+          s"FROM $fullSourceTableName a JOIN $fullSourceTableName b " +
+          s"ON a.region = b.region"
+      val metricView = MetricView(
+        "0.1",
+        SQLSource(selfJoinSql),
+        where = None,
+        select = Seq(
+          Column("region", DimensionExpression("a_region"), 0),
+          Column("count_sum", MeasureExpression("sum(a_count)"), 1)))
+      createMetricView(fullMetricViewName, metricView)
+
+      val deps = capturedViewInfo().viewDependencies()
+      assert(deps != null && deps.dependencies().size() === 1,
+        s"Expected 1 deduplicated dependency for self-join, got " +
+          s"${Option(deps).map(_.dependencies().size()).getOrElse(0)}")
+      val tableDep = deps.dependencies().get(0).asInstanceOf[TableDependency]
+      assert(tableDep.nameParts().asScala.toSeq ===
+        Seq(testCatalogName, testNamespace, sourceTableName))
+    }
+  }
+
+  test("dependency extraction: V1 session-catalog source emits 3-part 
nameParts") {
+    val v1Source = "metric_view_v2_v1source"
+    spark.range(0, 5).toDF("v")
+      .write.mode("overwrite").saveAsTable(v1Source)
+    try {
+      withTestCatalogTables {
+        val mv = MetricView(
+          "0.1",
+          // SQL source resolves through the current (session) catalog; the 
resolved
+          // `LogicalRelation` carries a session-catalog `CatalogTable`.
+          SQLSource(s"SELECT v AS region, v AS count FROM $v1Source"),
+          where = None,
+          select = metricViewColumns)
+        createMetricView(fullMetricViewName, mv)
+
+        val deps = capturedViewInfo().viewDependencies()
+        assert(deps != null && deps.dependencies().size() === 1)
+        val parts =
+          
deps.dependencies().get(0).asInstanceOf[TableDependency].nameParts().asScala.toSeq
+        // `MetricViewHelper.qualifyV1` normalizes any 
`TableIdentifier.nameParts` shape
+        // (1, 2, or 3 parts depending on what the analyzer captured) to the 
stable
+        // `[spark_catalog, db, table]` shape so downstream consumers see 
deterministic
+        // arity per source kind.
+        assert(parts.length === 3,
+          s"V1 nameParts should normalize to exactly 3 parts, got 
${parts.length}: $parts")
+        assert(parts.head === "spark_catalog",
+          s"V1 nameParts head should be the session-catalog name, got $parts")
+        assert(parts.last === v1Source, s"Last part should be the table name, 
got $parts")
+      }
+    } finally {
+      sql(s"DROP TABLE IF EXISTS $v1Source")
+    }
+  }
+
+  test("dependency extraction: multi-level V2 namespace source emits N+2 
nameParts") {
+    val multiNamespace = Array("ns_a", "ns_b")
+    val multiTable = "events_deep"
+    val multiFull = 
s"$testCatalogName.${multiNamespace.mkString(".")}.$multiTable"
+    withTestCatalogTables {
+      // The InMemoryTableCatalog (TableViewCatalog mixin) supports 
multi-level namespaces.
+      sql(s"CREATE NAMESPACE IF NOT EXISTS 
$testCatalogName.${multiNamespace.head}")
+      sql(s"CREATE NAMESPACE IF NOT EXISTS " +
+        s"$testCatalogName.${multiNamespace.mkString(".")}")
+      sql(s"CREATE TABLE $multiFull (region STRING, count INT) USING foo")
+      try {
+        val mv = MetricView(
+          "0.1",
+          SQLSource(s"SELECT region, count FROM $multiFull"),
+          where = None,
+          select = metricViewColumns)
+        createMetricView(fullMetricViewName, mv)
+
+        val deps = capturedViewInfo().viewDependencies()
+        assert(deps != null && deps.dependencies().size() === 1)
+        val parts =
+          
deps.dependencies().get(0).asInstanceOf[TableDependency].nameParts().asScala.toSeq
+        assert(parts === Seq(testCatalogName, multiNamespace(0), 
multiNamespace(1), multiTable),
+          s"Multi-level nameParts should preserve every namespace component, 
got $parts")
+      } finally {
+        sql(s"DROP TABLE IF EXISTS $multiFull")
+        sql(s"DROP NAMESPACE IF EXISTS " +
+          s"$testCatalogName.${multiNamespace.mkString(".")} CASCADE")
+        sql(s"DROP NAMESPACE IF EXISTS $testCatalogName.${multiNamespace.head} 
CASCADE")
+      }
+    }
+  }
+
+  // ============================================================
+  // Section 3: SELECT cases
+  // ============================================================
+
+
+  test("SELECT measure(...) from a v2 metric view returns aggregated rows") {
+    withTestCatalogTables {
+      val mv = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, mv)
+      // The fixture's `events` source has rows ("region_1", 1, 5.0), 
("region_2", 2, 10.0).
+      // The metric view aggregates by `region` summing `count`. Resolution 
flows through
+      // loadTableOrView -> MetadataTable(ViewInfo) -> 
V1Table.toCatalogTable(ViewInfo) ->
+      // CatalogTableType.METRIC_VIEW -> ResolveMetricView, which rewrites the 
view body
+      // into Aggregate(Seq(region), Seq(sum(count) AS count_sum)) over 
`events`. The
+      // `measure(...)` wrapper is required for measure columns -- selecting 
`count_sum`
+      // bare would fail (mirrors the v1 `MetricViewSuite` query syntax).
+      checkAnswer(
+        sql(s"SELECT region, measure(count_sum) FROM $fullMetricViewName " +
+          "GROUP BY region ORDER BY region"),
+        sql(s"SELECT region, sum(count) FROM $fullSourceTableName " +
+          "GROUP BY region ORDER BY region"))
+    }
+  }
+
+  test("SELECT measure(...) with a WHERE clause on a dimension") {
+    withTestCatalogTables {
+      val mv = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, mv)
+      // Filter at the query layer (not on the metric view's own `where:`).
+      checkAnswer(
+        sql(s"SELECT measure(count_sum) FROM $fullMetricViewName " +
+          "WHERE region = 'region_2'"),
+        sql(s"SELECT sum(count) FROM $fullSourceTableName " +
+          "WHERE region = 'region_2'"))
+    }
+  }
+
+  test("SELECT against a v2 metric view honors the view's pre-defined where 
clause") {
+    withTestCatalogTables {
+      // Pre-define a filter on the metric view itself: only rows with count > 
1 should be
+      // visible to consumers (i.e. region_2 only).
+      val mv = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = Some("count > 1"),
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, mv)
+      checkAnswer(
+        sql(s"SELECT region, measure(count_sum) FROM $fullMetricViewName " +
+          "GROUP BY region ORDER BY region"),
+        sql(s"SELECT region, sum(count) FROM $fullSourceTableName " +
+          "WHERE count > 1 GROUP BY region ORDER BY region"))
+    }
+  }
+
+  test("SELECT from a v2 metric view supports multiple measures with different 
aggregations") {
+    withTestCatalogTables {
+      // Add a second measure (sum of price) so we exercise the multi-measure 
rewrite path.
+      val cols = Seq(
+        Column("region", DimensionExpression("region"), 0),
+        Column("count_sum", MeasureExpression("sum(count)"), 1),
+        Column("price_sum", MeasureExpression("sum(price)"), 2),
+        Column("price_max", MeasureExpression("max(price)"), 3))
+      val mv = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = cols)
+      createMetricView(fullMetricViewName, mv)
+      checkAnswer(
+        sql(s"SELECT measure(count_sum), measure(price_sum), 
measure(price_max) " +
+          s"FROM $fullMetricViewName"),
+        sql(s"SELECT sum(count), sum(price), max(price) FROM 
$fullSourceTableName"))
+    }
+  }
+
+  test("SELECT from a v2 metric view supports ORDER BY and LIMIT on measures") 
{
+    withTestCatalogTables {
+      val mv = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      createMetricView(fullMetricViewName, mv)
+      checkAnswer(
+        sql(s"SELECT region, measure(count_sum) FROM $fullMetricViewName " +
+          "GROUP BY region ORDER BY 2 DESC LIMIT 1"),
+        sql(s"SELECT region, sum(count) FROM $fullSourceTableName " +
+          "GROUP BY region ORDER BY 2 DESC LIMIT 1"))
+    }
+  }
+
+  // ============================================================
+  // Section 4: DESCRIBE cases
+  // ============================================================
+
+
+  test("DESCRIBE TABLE EXTENDED on a v2 metric view round-trips through 
loadTableOrView") {
+    withTestCatalogTables {
+      val mv = MetricView(
+        "0.1",
+        AssetSource(fullSourceTableName),
+        where = None,
+        select = metricViewColumns)
+      val yaml = createMetricView(fullMetricViewName, mv)
+
+      // DESCRIBE TABLE EXTENDED resolves the ident through 
`Analyzer.lookupTableOrView`,
+      // which calls `TableViewCatalog.loadTableOrView` once and gets back a
+      // `MetadataTable(ViewInfo)`. The analyzer wraps it as a 
`ResolvedPersistentView` and
+      // `DataSourceV2Strategy` routes through SPARK-56655's 
`DescribeV2ViewExec`, which
+      // reads the typed `ViewInfo` directly and emits the standard "Type" / 
"View Text" /
+      // "View Current Catalog" / "View Schema Mode" / etc. rows. Pins 
`DescribeV2ViewExec`
+      // emits a "Type" row matching v1 `CatalogTable.toJsonLinkedHashMap` 
parity, so users
+      // can distinguish a plain VIEW from a sub-kind like METRIC_VIEW.

Review Comment:
   Sentence is broken: "Pins `DescribeV2ViewExec` emits a \"Type\" row matching 
v1 `CatalogTable.toJsonLinkedHashMap` parity ..." reads as if `Pins` is a verb 
taking `DescribeV2ViewExec` as its direct object, and "matching X parity" is 
awkward.
   
   ```suggestion
         // "View Current Catalog" / "View Schema Mode" / etc. rows. Pins that 
`DescribeV2ViewExec`
         // emits a "Type" row for parity with v1 
`CatalogTable.toJsonLinkedHashMap`, so users
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to