dtenedor commented on code in PR #52334:
URL: https://github.com/apache/spark/pull/52334#discussion_r2421338947


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/LiteralToSqlConverter.scala:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.{InternalRow}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedFunction
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types._
+
+/**
+ * Utility for converting Catalyst literal expressions to their SQL string 
representation.
+ *
+ * This object provides a specialized implementation for converting Spark SQL 
literal
+ * expressions to their equivalent SQL text representation. It is used by the 
parameter
+ * substitution system for EXECUTE IMMEDIATE and other parameterized queries.
+ *
+ * Key features:
+ * - Handles all Spark SQL data types for literal values
+ * - Supports both Scala collections and Spark internal data structures
+ * - Proper SQL escaping and formatting
+ * - Optimized for literal expressions only
+ *
+ * Supported data types:
+ * - Primitives: String, Integer, Long, Float, Double, Boolean, Decimal
+ * - Temporal: Date, Timestamp, TimestampNTZ, Interval
+ * - Collections: Array, Map (including nested structures)
+ * - Special: Null values, Binary data
+ * - Complex: Nested arrays, maps of arrays, arrays of maps
+ *
+ * @example Basic usage:
+ * {{{
+ * val result1 = LiteralToSqlConverter.convert(Literal(42))
+ * // result1: "42"
+ *
+ * val result2 = LiteralToSqlConverter.convert(Literal("hello"))
+ * // result2: "'hello'"
+ *
+ * val arrayLit = Literal.create(Array(1, 2, 3), ArrayType(IntegerType))
+ * val result3 = LiteralToSqlConverter.convert(arrayLit)
+ * // result3: "ARRAY(1, 2, 3)"
+ * }}}
+ *
+ * @example Complex types:
+ * {{{
+ * val mapLit = Literal.create(Map("key" -> "value"), MapType(StringType, 
StringType))
+ * val result = LiteralToSqlConverter.convert(mapLit)
+ * // result: "MAP('key', 'value')"
+ * }}}
+ *
+ * @note This utility is thread-safe and can be used concurrently.
+ * @note Only supports Literal expressions - all parameter values must be 
pre-evaluated.
+ * @see [[ParameterHandler]] for the main parameter handling entry point
+ */
+object LiteralToSqlConverter {
+
+  /**
+   * Convert an expression to its SQL string representation.
+   *
+   * This method handles both simple literals and complex expressions that 
result from
+   * parameter evaluation. For complex types like arrays and maps, the 
expressions are
+   * evaluated to internal data structures that need to be converted back to 
SQL constructors.
+   *
+   * @param expr The expression to convert (typically a Literal, but may be 
other expressions
+   *             for complex types)
+   * @return SQL string representation of the expression value
+   */
+  def convert(expr: Expression): String = expr match {
+    case lit: Literal => convertLiteral(lit)
+
+    // Special handling for UnresolvedFunction expressions that don't 
naturally evaluate
+    // Only handle functions that are whitelisted in legacy mode but don't 
eval() naturally
+    case UnresolvedFunction(name, children, _, _, _, _, _) =>
+      val functionName = name.mkString(".")
+      functionName.toLowerCase(java.util.Locale.ROOT) match {
+        case "array" | "map" | "struct" | "map_from_arrays" | 
"map_from_entries" =>
+          // Convert whitelisted functions to SQL function call syntax
+          val childrenSql = children.map(convert).mkString(", ")
+          s"${functionName.toUpperCase(java.util.Locale.ROOT)}($childrenSql)"
+        case _ =>
+          // Non-whitelisted function - not supported in parameter substitution
+          throw QueryCompilationErrors.unsupportedParameterExpression(expr)
+      }
+
+    case _ =>
+      // For non-literal expressions, they should be resolved before reaching 
this converter
+      // If we get an unresolved expression, it indicates a problem in the 
calling code
+      if (!expr.resolved) {
+        throw SparkException.internalError(
+          s"LiteralToSqlConverter received unresolved expression: " +
+          s"${expr.getClass.getSimpleName}. All expressions should be resolved 
before " +
+          s"parameter conversion.")
+      }
+      if (expr.foldable) {

Review Comment:
   you can reverse the logic and throw the exception, skipping the 'else' branch
   
   ```
   if (!expr.foldable) {
   throw SparkException.internalError(
     s"LiteralToSqlConverter cannot convert non-foldable expression: " +
     s"${expr.getClass.getSimpleName}. All parameter values should be evaluable 
to " +
     s"literals before conversion.")
   }
   val value = expr.eval()
   val dataType = expr.dataType
   convertLiteral(Literal.create(value, dataType))
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/LiteralToSqlConverter.scala:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.{InternalRow}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedFunction
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types._
+
+/**
+ * Utility for converting Catalyst literal expressions to their SQL string 
representation.
+ *
+ * This object provides a specialized implementation for converting Spark SQL 
literal
+ * expressions to their equivalent SQL text representation. It is used by the 
parameter
+ * substitution system for EXECUTE IMMEDIATE and other parameterized queries.
+ *
+ * Key features:
+ * - Handles all Spark SQL data types for literal values
+ * - Supports both Scala collections and Spark internal data structures
+ * - Proper SQL escaping and formatting
+ * - Optimized for literal expressions only
+ *
+ * Supported data types:
+ * - Primitives: String, Integer, Long, Float, Double, Boolean, Decimal
+ * - Temporal: Date, Timestamp, TimestampNTZ, Interval
+ * - Collections: Array, Map (including nested structures)
+ * - Special: Null values, Binary data
+ * - Complex: Nested arrays, maps of arrays, arrays of maps
+ *
+ * @example Basic usage:
+ * {{{
+ * val result1 = LiteralToSqlConverter.convert(Literal(42))
+ * // result1: "42"
+ *
+ * val result2 = LiteralToSqlConverter.convert(Literal("hello"))
+ * // result2: "'hello'"
+ *
+ * val arrayLit = Literal.create(Array(1, 2, 3), ArrayType(IntegerType))
+ * val result3 = LiteralToSqlConverter.convert(arrayLit)
+ * // result3: "ARRAY(1, 2, 3)"
+ * }}}
+ *
+ * @example Complex types:
+ * {{{
+ * val mapLit = Literal.create(Map("key" -> "value"), MapType(StringType, 
StringType))
+ * val result = LiteralToSqlConverter.convert(mapLit)
+ * // result: "MAP('key', 'value')"
+ * }}}
+ *
+ * @note This utility is thread-safe and can be used concurrently.
+ * @note Only supports Literal expressions - all parameter values must be 
pre-evaluated.
+ * @see [[ParameterHandler]] for the main parameter handling entry point
+ */
+object LiteralToSqlConverter {
+
+  /**
+   * Convert an expression to its SQL string representation.
+   *
+   * This method handles both simple literals and complex expressions that 
result from
+   * parameter evaluation. For complex types like arrays and maps, the 
expressions are
+   * evaluated to internal data structures that need to be converted back to 
SQL constructors.
+   *
+   * @param expr The expression to convert (typically a Literal, but may be 
other expressions
+   *             for complex types)
+   * @return SQL string representation of the expression value
+   */
+  def convert(expr: Expression): String = expr match {
+    case lit: Literal => convertLiteral(lit)
+
+    // Special handling for UnresolvedFunction expressions that don't 
naturally evaluate
+    // Only handle functions that are whitelisted in legacy mode but don't 
eval() naturally
+    case UnresolvedFunction(name, children, _, _, _, _, _) =>
+      val functionName = name.mkString(".")
+      functionName.toLowerCase(java.util.Locale.ROOT) match {
+        case "array" | "map" | "struct" | "map_from_arrays" | 
"map_from_entries" =>
+          // Convert whitelisted functions to SQL function call syntax
+          val childrenSql = children.map(convert).mkString(", ")
+          s"${functionName.toUpperCase(java.util.Locale.ROOT)}($childrenSql)"
+        case _ =>
+          // Non-whitelisted function - not supported in parameter substitution
+          throw QueryCompilationErrors.unsupportedParameterExpression(expr)
+      }
+
+    case _ =>
+      // For non-literal expressions, they should be resolved before reaching 
this converter
+      // If we get an unresolved expression, it indicates a problem in the 
calling code
+      if (!expr.resolved) {
+        throw SparkException.internalError(
+          s"LiteralToSqlConverter received unresolved expression: " +
+          s"${expr.getClass.getSimpleName}. All expressions should be resolved 
before " +
+          s"parameter conversion.")
+      }
+      if (expr.foldable) {
+        val value = expr.eval()
+        val dataType = expr.dataType
+        convertLiteral(Literal.create(value, dataType))
+      } else {
+        throw SparkException.internalError(
+          s"LiteralToSqlConverter cannot convert non-foldable expression: " +
+          s"${expr.getClass.getSimpleName}. All parameter values should be 
evaluable to " +
+          s"literals before conversion.")
+      }
+  }
+
+  private def convertLiteral(lit: Literal): String = {
+    // For simple cases, delegate to the existing Literal.sql method
+    // which already has the correct logic for most data types
+    try {
+      lit.sql

Review Comment:
   There are too many levels of indentation below, we need to simplify it. I'd 
do the following:
   
   * Do the `lit.sql` attempt outside of this function, catch the MatchError, 
and then put L129-L157 into the helper function instead like 
`convertLiteralManualConversion`
   * Move the data type matches starting on L134 into the parent match, like
   ```
   case Literal(value, ArrayType(elementType, _)) =>
     convertArrayLiteral(value, elementType)
   ```
   * Flatten out the match on L152 in the same way, like
   ```
   case Literal(row: InternalRow, structType @ StructType(elementType, _)) =>
     val fieldValues = (0 until row.numFields).map { i => 
       ...
   case Literal(value, StructType(elementType, _)) =>
     s"ROW(${value.toString})"
   ```
   
   At the end it should just be one list of patterns against `lit` with no 
nested pattern matches.
   



##########
sql/api/src/main/scala/org/apache/spark/sql/catalyst/parser/PositionMapper.scala:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.parser
+
+/**
+ * Case class representing a text substitution.
+ */
+case class Substitution(start: Int, end: Int, replacement: String)
+
+/**
+ * Represents a range mapping from substituted positions to original 
positions. This is used for
+ * efficient O(k) position mapping where k = number of substitutions.
+ *
+ * @param substitutedStart
+ *   Start position in substituted text (inclusive)
+ * @param substitutedEnd
+ *   End position in substituted text (exclusive)
+ * @param originalStart
+ *   Start position in original text
+ * @param offsetDelta
+ *   Offset difference between original and substituted positions
+ */
+case class PositionRange(
+    substitutedStart: Int,
+    substitutedEnd: Int,
+    originalStart: Int,
+    offsetDelta: Int)
+
+/**
+ * Maps positions between original SQL text and substituted SQL text using 
sparse ranges.
+ *
+ * This implementation uses O(k) space and O(log k) lookup time where k = 
number of substitutions,
+ * instead of the previous O(n) space where n = SQL text length.
+ *
+ * @param originalText
+ *   The original SQL text with parameter markers
+ * @param substitutedText
+ *   The SQL text after parameter substitution
+ * @param substitutions
+ *   List of substitutions that were applied
+ */
+class PositionMapper(
+    val originalText: String,
+    val substitutedText: String,
+    val substitutions: List[Substitution]) {
+
+  // Build sparse position ranges for efficient lookup
+  private val positionRanges = buildPositionRanges()
+
+  /**
+   * Map a position in the substituted text back to the original text. Uses 
binary search for
+   * O(log k) lookup time.
+   *
+   * @param substitutedPos
+   *   Position in the substituted text
+   * @return
+   *   Position in the original text, or the same position if no mapping exists
+   */
+  def mapToOriginal(substitutedPos: Int): Int = {
+    // Binary search for the range containing this position
+    positionRanges.find(range =>
+      substitutedPos >= range.substitutedStart && substitutedPos < 
range.substitutedEnd) match {
+      case Some(range) =>
+        // Position is within a substitution range
+        range.originalStart
+      case None =>
+        // Position is in an unmapped region - apply cumulative offset
+        val cumulativeOffset = positionRanges
+          .takeWhile(_.substitutedStart <= substitutedPos)
+          .map(_.offsetDelta)
+          .sum
+        substitutedPos + cumulativeOffset
+    }
+  }
+
+  /**
+   * Build sparse position ranges using functional approach. O(k) space 
complexity where k =
+   * number of substitutions.

Review Comment:
   can we possibly give a brief example here, it could help the reader 
understand :)



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/LiteralToSqlConverter.scala:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.{InternalRow}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedFunction
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types._
+
+/**
+ * Utility for converting Catalyst literal expressions to their SQL string 
representation.
+ *
+ * This object provides a specialized implementation for converting Spark SQL 
literal
+ * expressions to their equivalent SQL text representation. It is used by the 
parameter
+ * substitution system for EXECUTE IMMEDIATE and other parameterized queries.
+ *
+ * Key features:
+ * - Handles all Spark SQL data types for literal values
+ * - Supports both Scala collections and Spark internal data structures
+ * - Proper SQL escaping and formatting
+ * - Optimized for literal expressions only
+ *
+ * Supported data types:
+ * - Primitives: String, Integer, Long, Float, Double, Boolean, Decimal
+ * - Temporal: Date, Timestamp, TimestampNTZ, Interval
+ * - Collections: Array, Map (including nested structures)
+ * - Special: Null values, Binary data
+ * - Complex: Nested arrays, maps of arrays, arrays of maps
+ *
+ * @example Basic usage:
+ * {{{
+ * val result1 = LiteralToSqlConverter.convert(Literal(42))
+ * // result1: "42"
+ *
+ * val result2 = LiteralToSqlConverter.convert(Literal("hello"))
+ * // result2: "'hello'"
+ *
+ * val arrayLit = Literal.create(Array(1, 2, 3), ArrayType(IntegerType))
+ * val result3 = LiteralToSqlConverter.convert(arrayLit)
+ * // result3: "ARRAY(1, 2, 3)"
+ * }}}
+ *
+ * @example Complex types:
+ * {{{
+ * val mapLit = Literal.create(Map("key" -> "value"), MapType(StringType, 
StringType))
+ * val result = LiteralToSqlConverter.convert(mapLit)
+ * // result: "MAP('key', 'value')"
+ * }}}
+ *
+ * @note This utility is thread-safe and can be used concurrently.
+ * @note Only supports Literal expressions - all parameter values must be 
pre-evaluated.
+ * @see [[ParameterHandler]] for the main parameter handling entry point
+ */
+object LiteralToSqlConverter {
+
+  /**
+   * Convert an expression to its SQL string representation.
+   *
+   * This method handles both simple literals and complex expressions that 
result from
+   * parameter evaluation. For complex types like arrays and maps, the 
expressions are
+   * evaluated to internal data structures that need to be converted back to 
SQL constructors.
+   *
+   * @param expr The expression to convert (typically a Literal, but may be 
other expressions
+   *             for complex types)
+   * @return SQL string representation of the expression value
+   */
+  def convert(expr: Expression): String = expr match {
+    case lit: Literal => convertLiteral(lit)
+
+    // Special handling for UnresolvedFunction expressions that don't 
naturally evaluate
+    // Only handle functions that are whitelisted in legacy mode but don't 
eval() naturally
+    case UnresolvedFunction(name, children, _, _, _, _, _) =>
+      val functionName = name.mkString(".")
+      functionName.toLowerCase(java.util.Locale.ROOT) match {
+        case "array" | "map" | "struct" | "map_from_arrays" | 
"map_from_entries" =>
+          // Convert whitelisted functions to SQL function call syntax
+          val childrenSql = children.map(convert).mkString(", ")
+          s"${functionName.toUpperCase(java.util.Locale.ROOT)}($childrenSql)"
+        case _ =>
+          // Non-whitelisted function - not supported in parameter substitution
+          throw QueryCompilationErrors.unsupportedParameterExpression(expr)
+      }
+
+    case _ =>
+      // For non-literal expressions, they should be resolved before reaching 
this converter
+      // If we get an unresolved expression, it indicates a problem in the 
calling code
+      if (!expr.resolved) {
+        throw SparkException.internalError(
+          s"LiteralToSqlConverter received unresolved expression: " +
+          s"${expr.getClass.getSimpleName}. All expressions should be resolved 
before " +
+          s"parameter conversion.")
+      }
+      if (expr.foldable) {
+        val value = expr.eval()
+        val dataType = expr.dataType
+        convertLiteral(Literal.create(value, dataType))
+      } else {
+        throw SparkException.internalError(
+          s"LiteralToSqlConverter cannot convert non-foldable expression: " +
+          s"${expr.getClass.getSimpleName}. All parameter values should be 
evaluable to " +
+          s"literals before conversion.")
+      }
+  }
+
+  private def convertLiteral(lit: Literal): String = {
+    // For simple cases, delegate to the existing Literal.sql method
+    // which already has the correct logic for most data types
+    try {
+      lit.sql
+    } catch {
+      case _: MatchError =>
+        // Fallback to manual conversion for cases where Literal.sql doesn't 
have
+        // a pattern match for the specific (value, dataType) combination
+        lit match {
+          case Literal(null, _) => "NULL"
+          case Literal(value, dataType) => dataType match {
+            case ArrayType(elementType, _) => convertArrayLiteral(value, 
elementType)
+            case MapType(keyType, valueType, _) => convertMapLiteral(value, 
keyType, valueType)
+            case _: StructType =>
+              // Struct literals (row values) - convert to ROW constructor
+              value match {
+                case row: InternalRow =>
+                  val structType = dataType.asInstanceOf[StructType]
+                  val fieldValues = (0 until row.numFields).map { i =>
+                    if (row.isNullAt(i)) {
+                      "NULL"
+                    } else {
+                      val fieldType = structType.fields(i).dataType
+                      val fieldValue = row.get(i, fieldType)
+                      val fieldLiteral = Literal.create(fieldValue, fieldType)
+                      convert(fieldLiteral)
+                    }
+                  }
+                  s"ROW(${fieldValues.mkString(", ")})"
+                case _ => s"ROW(${value.toString})"
+              }
+            case _ =>
+              // For any other unsupported type, fall back to string 
representation
+              s"'${value.toString.replace("'", "''")}'"
+          }
+        }
+    }
+  }
+
+  private def convertArrayLiteral(value: Any, elementType: DataType): String = 
{
+    if (value == null) "NULL"
+    else {

Review Comment:
   ```suggestion
       } else {
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/SubstituteParamsParser.scala:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.parser
+
+import org.antlr.v4.runtime.{CharStreams, CommonTokenStream}
+import org.antlr.v4.runtime.atn.PredictionMode
+import org.antlr.v4.runtime.misc.ParseCancellationException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.trees.SQLQueryContext
+import org.apache.spark.sql.internal.SQLConf
+
+
+/**
+ * A parameter substitution parser that replaces parameter markers in SQL text 
with their values.
+ * This parser finds parameter markers and substitutes them with provided 
values to produce
+ * a modified SQL string ready for execution.
+ */
+class SubstituteParamsParser extends Logging {
+
+  /**
+   * Substitute parameter markers in SQL text with provided values.
+   * Always uses compoundOrSingleStatement parsing which can handle all SQL 
constructs.
+   *
+   * @param sqlText          The original SQL text containing parameter markers
+   * @param namedParams      Map of named parameter values (paramName -> value)
+   * @param positionalParams List of positional parameter values in order
+   * @return A tuple of (modified SQL string with parameters substituted,
+   *         number of consumed positional parameters)
+   */
+  def substitute(
+      sqlText: String,
+      namedParams: Map[String, String] = Map.empty,
+      positionalParams: List[String] = List.empty): (String, Int, 
PositionMapper) = {
+
+    // Quick pre-check: if there are no parameter markers in the text, skip 
parsing entirely
+    if (!sqlText.contains("?") && !sqlText.contains(":")) {
+      return (sqlText, 0, PositionMapper.identity(sqlText))
+    }
+
+    val lexer = new SqlBaseLexer(new 
UpperCaseCharStream(CharStreams.fromString(sqlText)))
+    lexer.removeErrorListeners()
+    lexer.addErrorListener(ParseErrorListener)
+
+    val tokenStream = new CommonTokenStream(lexer)
+    val parser = new SqlBaseParser(tokenStream)
+    // Match main parser configuration for consistent error messages
+    parser.addParseListener(PostProcessor)
+    parser.addParseListener(UnclosedCommentProcessor(sqlText, tokenStream))
+    parser.removeErrorListeners()
+    parser.addErrorListener(ParseErrorListener)
+    parser.legacy_setops_precedence_enabled = 
SQLConf.get.setOpsPrecedenceEnforced
+    parser.legacy_exponent_literal_as_decimal_enabled = 
SQLConf.get.exponentLiteralAsDecimalEnabled
+    parser.SQL_standard_keyword_behavior = SQLConf.get.enforceReservedKeywords
+    parser.double_quoted_identifiers = SQLConf.get.doubleQuotedIdentifiers
+    parser.parameter_substitution_enabled = 
!SQLConf.get.legacyParameterSubstitutionConstantsOnly
+
+    val astBuilder = new SubstituteParmsAstBuilder()
+
+    // Use the same two-stage parsing strategy as the main parser for 
consistent error messages
+    val ctx = try {
+      // First attempt: SLL mode with bail error strategy
+      parser.setErrorHandler(new SparkParserBailErrorStrategy())
+      parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
+      parser.compoundOrSingleStatement()
+    } catch {
+      case e: ParseCancellationException =>
+        // Second attempt: LL mode with full error strategy
+        tokenStream.seek(0) // rewind input stream
+        parser.reset()
+        parser.setErrorHandler(new SparkParserErrorStrategy())
+        parser.getInterpreter.setPredictionMode(PredictionMode.LL)
+        parser.compoundOrSingleStatement()
+    }
+    val parameterLocations = astBuilder.extractParameterLocations(ctx)
+
+    // Substitute parameters in the original text
+    val (substitutedSql, appliedSubstitutions) = 
substituteAtLocations(sqlText, parameterLocations,
+      namedParams, positionalParams)
+    val consumedPositionalParams = 
parameterLocations.positionalParameterLocations.length
+
+    // Create position mapper for error context translation
+    val positionMapper = PositionMapper(sqlText, substitutedSql, 
appliedSubstitutions)
+
+    (substitutedSql, consumedPositionalParams, positionMapper)
+  }
+
+  /**
+   * Detects parameter markers in SQL text without performing substitution.
+   * Always uses compoundOrSingleStatement parsing which can handle all SQL 
constructs.
+   *
+   * @param sqlText The original SQL text to analyze
+   * @return A tuple of (hasPositionalParameters, hasNamedParameters)
+   */
+  def detectParameters(sqlText: String): (Boolean, Boolean) = {
+    // Quick pre-check: if there are no parameter markers in the text, skip 
parsing entirely
+    if (!sqlText.contains("?") && !sqlText.contains(":")) {
+      return (false, false)
+    }
+
+    val lexer = new SqlBaseLexer(new 
UpperCaseCharStream(CharStreams.fromString(sqlText)))
+    lexer.removeErrorListeners()
+    lexer.addErrorListener(ParseErrorListener)
+
+    val tokenStream = new CommonTokenStream(lexer)
+    val parser = new SqlBaseParser(tokenStream)
+    // Match main parser configuration for consistent error messages

Review Comment:
   same here, can we dedup it
   



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/SubstituteParamsParser.scala:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.parser
+
+import org.antlr.v4.runtime.{CharStreams, CommonTokenStream}
+import org.antlr.v4.runtime.atn.PredictionMode
+import org.antlr.v4.runtime.misc.ParseCancellationException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.trees.SQLQueryContext
+import org.apache.spark.sql.internal.SQLConf
+
+
+/**
+ * A parameter substitution parser that replaces parameter markers in SQL text 
with their values.
+ * This parser finds parameter markers and substitutes them with provided 
values to produce
+ * a modified SQL string ready for execution.
+ */
+class SubstituteParamsParser extends Logging {
+
+  /**
+   * Substitute parameter markers in SQL text with provided values.
+   * Always uses compoundOrSingleStatement parsing which can handle all SQL 
constructs.
+   *
+   * @param sqlText          The original SQL text containing parameter markers
+   * @param namedParams      Map of named parameter values (paramName -> value)
+   * @param positionalParams List of positional parameter values in order
+   * @return A tuple of (modified SQL string with parameters substituted,

Review Comment:
   fix indentation to start at the same column as the params above?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/LiteralToSqlConverter.scala:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.{InternalRow}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedFunction
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types._
+
+/**
+ * Utility for converting Catalyst literal expressions to their SQL string 
representation.
+ *
+ * This object provides a specialized implementation for converting Spark SQL 
literal
+ * expressions to their equivalent SQL text representation. It is used by the 
parameter
+ * substitution system for EXECUTE IMMEDIATE and other parameterized queries.
+ *
+ * Key features:
+ * - Handles all Spark SQL data types for literal values
+ * - Supports both Scala collections and Spark internal data structures
+ * - Proper SQL escaping and formatting
+ * - Optimized for literal expressions only
+ *
+ * Supported data types:
+ * - Primitives: String, Integer, Long, Float, Double, Boolean, Decimal
+ * - Temporal: Date, Timestamp, TimestampNTZ, Interval
+ * - Collections: Array, Map (including nested structures)
+ * - Special: Null values, Binary data
+ * - Complex: Nested arrays, maps of arrays, arrays of maps
+ *
+ * @example Basic usage:
+ * {{{
+ * val result1 = LiteralToSqlConverter.convert(Literal(42))
+ * // result1: "42"
+ *
+ * val result2 = LiteralToSqlConverter.convert(Literal("hello"))
+ * // result2: "'hello'"
+ *
+ * val arrayLit = Literal.create(Array(1, 2, 3), ArrayType(IntegerType))
+ * val result3 = LiteralToSqlConverter.convert(arrayLit)
+ * // result3: "ARRAY(1, 2, 3)"
+ * }}}
+ *
+ * @example Complex types:
+ * {{{
+ * val mapLit = Literal.create(Map("key" -> "value"), MapType(StringType, 
StringType))
+ * val result = LiteralToSqlConverter.convert(mapLit)
+ * // result: "MAP('key', 'value')"
+ * }}}
+ *
+ * @note This utility is thread-safe and can be used concurrently.
+ * @note Only supports Literal expressions - all parameter values must be 
pre-evaluated.
+ * @see [[ParameterHandler]] for the main parameter handling entry point
+ */
+object LiteralToSqlConverter {
+
+  /**
+   * Convert an expression to its SQL string representation.
+   *
+   * This method handles both simple literals and complex expressions that 
result from
+   * parameter evaluation. For complex types like arrays and maps, the 
expressions are
+   * evaluated to internal data structures that need to be converted back to 
SQL constructors.
+   *
+   * @param expr The expression to convert (typically a Literal, but may be 
other expressions
+   *             for complex types)
+   * @return SQL string representation of the expression value
+   */
+  def convert(expr: Expression): String = expr match {
+    case lit: Literal => convertLiteral(lit)
+
+    // Special handling for UnresolvedFunction expressions that don't 
naturally evaluate
+    // Only handle functions that are whitelisted in legacy mode but don't 
eval() naturally
+    case UnresolvedFunction(name, children, _, _, _, _, _) =>
+      val functionName = name.mkString(".")
+      functionName.toLowerCase(java.util.Locale.ROOT) match {
+        case "array" | "map" | "struct" | "map_from_arrays" | 
"map_from_entries" =>
+          // Convert whitelisted functions to SQL function call syntax
+          val childrenSql = children.map(convert).mkString(", ")
+          s"${functionName.toUpperCase(java.util.Locale.ROOT)}($childrenSql)"
+        case _ =>
+          // Non-whitelisted function - not supported in parameter substitution
+          throw QueryCompilationErrors.unsupportedParameterExpression(expr)
+      }
+
+    case _ =>
+      // For non-literal expressions, they should be resolved before reaching 
this converter
+      // If we get an unresolved expression, it indicates a problem in the 
calling code
+      if (!expr.resolved) {
+        throw SparkException.internalError(
+          s"LiteralToSqlConverter received unresolved expression: " +
+          s"${expr.getClass.getSimpleName}. All expressions should be resolved 
before " +
+          s"parameter conversion.")
+      }
+      if (expr.foldable) {
+        val value = expr.eval()
+        val dataType = expr.dataType
+        convertLiteral(Literal.create(value, dataType))
+      } else {
+        throw SparkException.internalError(
+          s"LiteralToSqlConverter cannot convert non-foldable expression: " +
+          s"${expr.getClass.getSimpleName}. All parameter values should be 
evaluable to " +
+          s"literals before conversion.")
+      }
+  }
+
+  private def convertLiteral(lit: Literal): String = {
+    // For simple cases, delegate to the existing Literal.sql method
+    // which already has the correct logic for most data types
+    try {
+      lit.sql
+    } catch {
+      case _: MatchError =>
+        // Fallback to manual conversion for cases where Literal.sql doesn't 
have
+        // a pattern match for the specific (value, dataType) combination
+        lit match {
+          case Literal(null, _) => "NULL"
+          case Literal(value, dataType) => dataType match {
+            case ArrayType(elementType, _) => convertArrayLiteral(value, 
elementType)
+            case MapType(keyType, valueType, _) => convertMapLiteral(value, 
keyType, valueType)
+            case _: StructType =>
+              // Struct literals (row values) - convert to ROW constructor
+              value match {
+                case row: InternalRow =>
+                  val structType = dataType.asInstanceOf[StructType]
+                  val fieldValues = (0 until row.numFields).map { i =>
+                    if (row.isNullAt(i)) {
+                      "NULL"
+                    } else {
+                      val fieldType = structType.fields(i).dataType
+                      val fieldValue = row.get(i, fieldType)
+                      val fieldLiteral = Literal.create(fieldValue, fieldType)
+                      convert(fieldLiteral)
+                    }
+                  }
+                  s"ROW(${fieldValues.mkString(", ")})"
+                case _ => s"ROW(${value.toString})"
+              }
+            case _ =>
+              // For any other unsupported type, fall back to string 
representation
+              s"'${value.toString.replace("'", "''")}'"
+          }
+        }
+    }
+  }
+
+  private def convertArrayLiteral(value: Any, elementType: DataType): String = 
{
+    if (value == null) "NULL"

Review Comment:
   ```suggestion
       if (value == null) {
         "NULL"
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/LiteralToSqlConverter.scala:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.{InternalRow}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedFunction
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types._
+
+/**
+ * Utility for converting Catalyst literal expressions to their SQL string 
representation.
+ *
+ * This object provides a specialized implementation for converting Spark SQL 
literal
+ * expressions to their equivalent SQL text representation. It is used by the 
parameter
+ * substitution system for EXECUTE IMMEDIATE and other parameterized queries.
+ *
+ * Key features:
+ * - Handles all Spark SQL data types for literal values
+ * - Supports both Scala collections and Spark internal data structures
+ * - Proper SQL escaping and formatting
+ * - Optimized for literal expressions only
+ *
+ * Supported data types:
+ * - Primitives: String, Integer, Long, Float, Double, Boolean, Decimal
+ * - Temporal: Date, Timestamp, TimestampNTZ, Interval
+ * - Collections: Array, Map (including nested structures)
+ * - Special: Null values, Binary data
+ * - Complex: Nested arrays, maps of arrays, arrays of maps
+ *
+ * @example Basic usage:
+ * {{{
+ * val result1 = LiteralToSqlConverter.convert(Literal(42))
+ * // result1: "42"
+ *
+ * val result2 = LiteralToSqlConverter.convert(Literal("hello"))
+ * // result2: "'hello'"
+ *
+ * val arrayLit = Literal.create(Array(1, 2, 3), ArrayType(IntegerType))
+ * val result3 = LiteralToSqlConverter.convert(arrayLit)
+ * // result3: "ARRAY(1, 2, 3)"
+ * }}}
+ *
+ * @example Complex types:
+ * {{{
+ * val mapLit = Literal.create(Map("key" -> "value"), MapType(StringType, 
StringType))
+ * val result = LiteralToSqlConverter.convert(mapLit)
+ * // result: "MAP('key', 'value')"
+ * }}}
+ *
+ * @note This utility is thread-safe and can be used concurrently.
+ * @note Only supports Literal expressions - all parameter values must be 
pre-evaluated.
+ * @see [[ParameterHandler]] for the main parameter handling entry point
+ */
+object LiteralToSqlConverter {
+
+  /**
+   * Convert an expression to its SQL string representation.
+   *
+   * This method handles both simple literals and complex expressions that 
result from
+   * parameter evaluation. For complex types like arrays and maps, the 
expressions are
+   * evaluated to internal data structures that need to be converted back to 
SQL constructors.
+   *
+   * @param expr The expression to convert (typically a Literal, but may be 
other expressions
+   *             for complex types)
+   * @return SQL string representation of the expression value
+   */
+  def convert(expr: Expression): String = expr match {
+    case lit: Literal => convertLiteral(lit)
+
+    // Special handling for UnresolvedFunction expressions that don't 
naturally evaluate
+    // Only handle functions that are whitelisted in legacy mode but don't 
eval() naturally
+    case UnresolvedFunction(name, children, _, _, _, _, _) =>
+      val functionName = name.mkString(".")
+      functionName.toLowerCase(java.util.Locale.ROOT) match {
+        case "array" | "map" | "struct" | "map_from_arrays" | 
"map_from_entries" =>
+          // Convert whitelisted functions to SQL function call syntax
+          val childrenSql = children.map(convert).mkString(", ")
+          s"${functionName.toUpperCase(java.util.Locale.ROOT)}($childrenSql)"
+        case _ =>
+          // Non-whitelisted function - not supported in parameter substitution
+          throw QueryCompilationErrors.unsupportedParameterExpression(expr)
+      }
+
+    case _ =>
+      // For non-literal expressions, they should be resolved before reaching 
this converter
+      // If we get an unresolved expression, it indicates a problem in the 
calling code
+      if (!expr.resolved) {
+        throw SparkException.internalError(
+          s"LiteralToSqlConverter received unresolved expression: " +
+          s"${expr.getClass.getSimpleName}. All expressions should be resolved 
before " +
+          s"parameter conversion.")
+      }
+      if (expr.foldable) {
+        val value = expr.eval()
+        val dataType = expr.dataType
+        convertLiteral(Literal.create(value, dataType))
+      } else {
+        throw SparkException.internalError(
+          s"LiteralToSqlConverter cannot convert non-foldable expression: " +
+          s"${expr.getClass.getSimpleName}. All parameter values should be 
evaluable to " +
+          s"literals before conversion.")
+      }
+  }
+
+  private def convertLiteral(lit: Literal): String = {
+    // For simple cases, delegate to the existing Literal.sql method
+    // which already has the correct logic for most data types
+    try {
+      lit.sql
+    } catch {
+      case _: MatchError =>
+        // Fallback to manual conversion for cases where Literal.sql doesn't 
have
+        // a pattern match for the specific (value, dataType) combination
+        lit match {
+          case Literal(null, _) => "NULL"
+          case Literal(value, dataType) => dataType match {
+            case ArrayType(elementType, _) => convertArrayLiteral(value, 
elementType)
+            case MapType(keyType, valueType, _) => convertMapLiteral(value, 
keyType, valueType)
+            case _: StructType =>
+              // Struct literals (row values) - convert to ROW constructor
+              value match {
+                case row: InternalRow =>
+                  val structType = dataType.asInstanceOf[StructType]
+                  val fieldValues = (0 until row.numFields).map { i =>
+                    if (row.isNullAt(i)) {
+                      "NULL"
+                    } else {
+                      val fieldType = structType.fields(i).dataType
+                      val fieldValue = row.get(i, fieldType)
+                      val fieldLiteral = Literal.create(fieldValue, fieldType)
+                      convert(fieldLiteral)
+                    }
+                  }
+                  s"ROW(${fieldValues.mkString(", ")})"
+                case _ => s"ROW(${value.toString})"
+              }
+            case _ =>
+              // For any other unsupported type, fall back to string 
representation
+              s"'${value.toString.replace("'", "''")}'"
+          }
+        }
+    }
+  }
+
+  private def convertArrayLiteral(value: Any, elementType: DataType): String = 
{
+    if (value == null) "NULL"
+    else {
+      // Handle both Scala collections and Spark's GenericArrayData
+      val arraySeq = value match {
+        case gad: GenericArrayData =>
+          gad.array.toSeq
+        case seq: scala.collection.Seq[Any] =>
+          seq
+        case arr: Array[Any] =>
+          arr.toSeq
+        case other =>
+          // Fallback: try to convert to array and then to sequence
+          Array(other).toSeq
+      }
+      val elementStrs = arraySeq.map { elem =>
+        convert(Literal.create(elem, elementType))
+      }
+      s"ARRAY(${elementStrs.mkString(", ")})"
+    }
+  }
+
+  private def convertMapLiteral(value: Any, keyType: DataType, valueType: 
DataType): String = {
+    if (value == null) "NULL"
+    else {

Review Comment:
   ```suggestion
       } else {
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/SubstituteParamsParser.scala:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.parser
+
+import org.antlr.v4.runtime.{CharStreams, CommonTokenStream}
+import org.antlr.v4.runtime.atn.PredictionMode
+import org.antlr.v4.runtime.misc.ParseCancellationException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.trees.SQLQueryContext
+import org.apache.spark.sql.internal.SQLConf
+
+
+/**
+ * A parameter substitution parser that replaces parameter markers in SQL text 
with their values.
+ * This parser finds parameter markers and substitutes them with provided 
values to produce
+ * a modified SQL string ready for execution.
+ */
+class SubstituteParamsParser extends Logging {
+
+  /**
+   * Substitute parameter markers in SQL text with provided values.
+   * Always uses compoundOrSingleStatement parsing which can handle all SQL 
constructs.
+   *
+   * @param sqlText          The original SQL text containing parameter markers
+   * @param namedParams      Map of named parameter values (paramName -> value)
+   * @param positionalParams List of positional parameter values in order
+   * @return A tuple of (modified SQL string with parameters substituted,
+   *         number of consumed positional parameters)
+   */
+  def substitute(
+      sqlText: String,
+      namedParams: Map[String, String] = Map.empty,
+      positionalParams: List[String] = List.empty): (String, Int, 
PositionMapper) = {
+
+    // Quick pre-check: if there are no parameter markers in the text, skip 
parsing entirely
+    if (!sqlText.contains("?") && !sqlText.contains(":")) {
+      return (sqlText, 0, PositionMapper.identity(sqlText))
+    }
+
+    val lexer = new SqlBaseLexer(new 
UpperCaseCharStream(CharStreams.fromString(sqlText)))
+    lexer.removeErrorListeners()
+    lexer.addErrorListener(ParseErrorListener)
+
+    val tokenStream = new CommonTokenStream(lexer)
+    val parser = new SqlBaseParser(tokenStream)
+    // Match main parser configuration for consistent error messages
+    parser.addParseListener(PostProcessor)
+    parser.addParseListener(UnclosedCommentProcessor(sqlText, tokenStream))

Review Comment:
   Is this code copy/pasted from [1]? If so, can we dedup it into a refactored 
shared helper from all locations?
   
   [1] 
https://github.com/apache/spark/blob/6eb4d3c9d38f6849b0acfcffdbadce03c8f49ac6/sql/api/src/main/scala/org/apache/spark/sql/catalyst/parser/parsers.scala#L70



##########
sql/api/src/main/scala/org/apache/spark/sql/catalyst/trees/origin.scala:
##########
@@ -19,9 +19,20 @@ package org.apache.spark.sql.catalyst.trees
 import java.util.regex.Pattern
 
 import org.apache.spark.QueryContext
+import org.apache.spark.sql.catalyst.parser.PositionMapper
 import org.apache.spark.sql.internal.SqlApiConf
 import org.apache.spark.util.ArrayImplicits._
 
+/**
+ * Information needed for parameter substitution position mapping. Stored 
directly in Origin to
+ * avoid callback complexity.

Review Comment:
   👍 👍 👍 👍 👍 👍 👍 👍 👍 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/SubstituteParamsParser.scala:
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.parser
+
+import org.antlr.v4.runtime.{CharStreams, CommonTokenStream}
+import org.antlr.v4.runtime.atn.PredictionMode
+import org.antlr.v4.runtime.misc.ParseCancellationException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.trees.SQLQueryContext
+import org.apache.spark.sql.internal.SQLConf
+
+
+/**
+ * A parameter substitution parser that replaces parameter markers in SQL text 
with their values.
+ * This parser finds parameter markers and substitutes them with provided 
values to produce
+ * a modified SQL string ready for execution.
+ */
+class SubstituteParamsParser extends Logging {
+
+  /**
+   * Substitute parameter markers in SQL text with provided values.
+   * Always uses compoundOrSingleStatement parsing which can handle all SQL 
constructs.
+   *
+   * @param sqlText          The original SQL text containing parameter markers
+   * @param namedParams      Map of named parameter values (paramName -> value)
+   * @param positionalParams List of positional parameter values in order
+   * @return A tuple of (modified SQL string with parameters substituted,
+   *         number of consumed positional parameters)
+   */
+  def substitute(
+      sqlText: String,
+      namedParams: Map[String, String] = Map.empty,
+      positionalParams: List[String] = List.empty): (String, Int, 
PositionMapper) = {
+
+    // Quick pre-check: if there are no parameter markers in the text, skip 
parsing entirely
+    if (!sqlText.contains("?") && !sqlText.contains(":")) {
+      return (sqlText, 0, PositionMapper.identity(sqlText))
+    }
+
+    val lexer = new SqlBaseLexer(new 
UpperCaseCharStream(CharStreams.fromString(sqlText)))
+    lexer.removeErrorListeners()
+    lexer.addErrorListener(ParseErrorListener)
+
+    val tokenStream = new CommonTokenStream(lexer)
+    val parser = new SqlBaseParser(tokenStream)
+    // Match main parser configuration for consistent error messages
+    parser.addParseListener(PostProcessor)
+    parser.addParseListener(UnclosedCommentProcessor(sqlText, tokenStream))
+    parser.removeErrorListeners()
+    parser.addErrorListener(ParseErrorListener)
+    parser.legacy_setops_precedence_enabled = 
SQLConf.get.setOpsPrecedenceEnforced
+    parser.legacy_exponent_literal_as_decimal_enabled = 
SQLConf.get.exponentLiteralAsDecimalEnabled
+    parser.SQL_standard_keyword_behavior = SQLConf.get.enforceReservedKeywords
+    parser.double_quoted_identifiers = SQLConf.get.doubleQuotedIdentifiers
+    parser.parameter_substitution_enabled = 
!SQLConf.get.legacyParameterSubstitutionConstantsOnly
+
+    val astBuilder = new SubstituteParmsAstBuilder()
+
+    // Use the same two-stage parsing strategy as the main parser for 
consistent error messages

Review Comment:
   super super nit: can we add punctuation at the end of all comments in this 
PR to match elsewhere in the codebase. 🙏 🙏 



##########
sql/api/src/main/scala/org/apache/spark/sql/catalyst/parser/PositionMapper.scala:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.parser
+
+/**
+ * Case class representing a text substitution.
+ */
+case class Substitution(start: Int, end: Int, replacement: String)
+
+/**
+ * Represents a range mapping from substituted positions to original 
positions. This is used for
+ * efficient O(k) position mapping where k = number of substitutions.
+ *
+ * @param substitutedStart
+ *   Start position in substituted text (inclusive)
+ * @param substitutedEnd
+ *   End position in substituted text (exclusive)
+ * @param originalStart
+ *   Start position in original text
+ * @param offsetDelta
+ *   Offset difference between original and substituted positions
+ */
+case class PositionRange(
+    substitutedStart: Int,
+    substitutedEnd: Int,
+    originalStart: Int,
+    offsetDelta: Int)
+
+/**
+ * Maps positions between original SQL text and substituted SQL text using 
sparse ranges.
+ *
+ * This implementation uses O(k) space and O(log k) lookup time where k = 
number of substitutions,
+ * instead of the previous O(n) space where n = SQL text length.
+ *
+ * @param originalText
+ *   The original SQL text with parameter markers
+ * @param substitutedText
+ *   The SQL text after parameter substitution
+ * @param substitutions
+ *   List of substitutions that were applied
+ */
+class PositionMapper(
+    val originalText: String,
+    val substitutedText: String,
+    val substitutions: List[Substitution]) {
+
+  // Build sparse position ranges for efficient lookup
+  private val positionRanges = buildPositionRanges()
+
+  /**
+   * Map a position in the substituted text back to the original text. Uses 
binary search for
+   * O(log k) lookup time.
+   *
+   * @param substitutedPos
+   *   Position in the substituted text
+   * @return
+   *   Position in the original text, or the same position if no mapping exists
+   */
+  def mapToOriginal(substitutedPos: Int): Int = {
+    // Binary search for the range containing this position
+    positionRanges.find(range =>
+      substitutedPos >= range.substitutedStart && substitutedPos < 
range.substitutedEnd) match {
+      case Some(range) =>
+        // Position is within a substitution range
+        range.originalStart
+      case None =>
+        // Position is in an unmapped region - apply cumulative offset
+        val cumulativeOffset = positionRanges
+          .takeWhile(_.substitutedStart <= substitutedPos)
+          .map(_.offsetDelta)
+          .sum
+        substitutedPos + cumulativeOffset
+    }
+  }
+
+  /**
+   * Build sparse position ranges using functional approach. O(k) space 
complexity where k =
+   * number of substitutions.
+   */
+  private def buildPositionRanges(): List[PositionRange] = {
+    if (substitutions.isEmpty) return List.empty

Review Comment:
   ```suggestion
       if (substitutions.isEmpty) {
         return List.empty
       }
   ```



##########
sql/api/src/main/scala/org/apache/spark/sql/catalyst/parser/SubstituteParmsAstBuilder.scala:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.parser
+
+import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode}
+
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
+import org.apache.spark.sql.catalyst.util.SparkParserUtils.withOrigin
+
+/**
+ * AST builder for extracting parameter markers and their locations from SQL 
parse trees. This
+ * builder traverses the parse tree and collects parameter information for 
substitution.
+ */
+class SubstituteParmsAstBuilder extends SqlBaseParserBaseVisitor[AnyRef] {
+
+  private val namedParams = scala.collection.mutable.Set[String]()

Review Comment:
   let's just `import scala.collection.mutable` up top so you can just write 
`mutable.Set`, `mutable.ListBuffer`, etc. Then these can all be one-liners



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/LiteralToSqlConverter.scala:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.{InternalRow}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedFunction
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types._
+
+/**
+ * Utility for converting Catalyst literal expressions to their SQL string 
representation.
+ *
+ * This object provides a specialized implementation for converting Spark SQL 
literal
+ * expressions to their equivalent SQL text representation. It is used by the 
parameter
+ * substitution system for EXECUTE IMMEDIATE and other parameterized queries.
+ *
+ * Key features:
+ * - Handles all Spark SQL data types for literal values
+ * - Supports both Scala collections and Spark internal data structures
+ * - Proper SQL escaping and formatting
+ * - Optimized for literal expressions only
+ *
+ * Supported data types:
+ * - Primitives: String, Integer, Long, Float, Double, Boolean, Decimal
+ * - Temporal: Date, Timestamp, TimestampNTZ, Interval
+ * - Collections: Array, Map (including nested structures)
+ * - Special: Null values, Binary data
+ * - Complex: Nested arrays, maps of arrays, arrays of maps
+ *
+ * @example Basic usage:
+ * {{{
+ * val result1 = LiteralToSqlConverter.convert(Literal(42))
+ * // result1: "42"
+ *
+ * val result2 = LiteralToSqlConverter.convert(Literal("hello"))
+ * // result2: "'hello'"
+ *
+ * val arrayLit = Literal.create(Array(1, 2, 3), ArrayType(IntegerType))
+ * val result3 = LiteralToSqlConverter.convert(arrayLit)
+ * // result3: "ARRAY(1, 2, 3)"
+ * }}}
+ *
+ * @example Complex types:
+ * {{{
+ * val mapLit = Literal.create(Map("key" -> "value"), MapType(StringType, 
StringType))
+ * val result = LiteralToSqlConverter.convert(mapLit)
+ * // result: "MAP('key', 'value')"
+ * }}}
+ *
+ * @note This utility is thread-safe and can be used concurrently.
+ * @note Only supports Literal expressions - all parameter values must be 
pre-evaluated.
+ * @see [[ParameterHandler]] for the main parameter handling entry point
+ */
+object LiteralToSqlConverter {
+
+  /**
+   * Convert an expression to its SQL string representation.
+   *
+   * This method handles both simple literals and complex expressions that 
result from
+   * parameter evaluation. For complex types like arrays and maps, the 
expressions are
+   * evaluated to internal data structures that need to be converted back to 
SQL constructors.
+   *
+   * @param expr The expression to convert (typically a Literal, but may be 
other expressions
+   *             for complex types)
+   * @return SQL string representation of the expression value
+   */
+  def convert(expr: Expression): String = expr match {
+    case lit: Literal => convertLiteral(lit)
+
+    // Special handling for UnresolvedFunction expressions that don't 
naturally evaluate
+    // Only handle functions that are whitelisted in legacy mode but don't 
eval() naturally
+    case UnresolvedFunction(name, children, _, _, _, _, _) =>
+      val functionName = name.mkString(".")
+      functionName.toLowerCase(java.util.Locale.ROOT) match {
+        case "array" | "map" | "struct" | "map_from_arrays" | 
"map_from_entries" =>
+          // Convert whitelisted functions to SQL function call syntax
+          val childrenSql = children.map(convert).mkString(", ")
+          s"${functionName.toUpperCase(java.util.Locale.ROOT)}($childrenSql)"
+        case _ =>
+          // Non-whitelisted function - not supported in parameter substitution
+          throw QueryCompilationErrors.unsupportedParameterExpression(expr)
+      }
+
+    case _ =>
+      // For non-literal expressions, they should be resolved before reaching 
this converter
+      // If we get an unresolved expression, it indicates a problem in the 
calling code
+      if (!expr.resolved) {
+        throw SparkException.internalError(
+          s"LiteralToSqlConverter received unresolved expression: " +
+          s"${expr.getClass.getSimpleName}. All expressions should be resolved 
before " +
+          s"parameter conversion.")
+      }
+      if (expr.foldable) {
+        val value = expr.eval()
+        val dataType = expr.dataType
+        convertLiteral(Literal.create(value, dataType))
+      } else {
+        throw SparkException.internalError(
+          s"LiteralToSqlConverter cannot convert non-foldable expression: " +
+          s"${expr.getClass.getSimpleName}. All parameter values should be 
evaluable to " +
+          s"literals before conversion.")
+      }
+  }
+
+  private def convertLiteral(lit: Literal): String = {
+    // For simple cases, delegate to the existing Literal.sql method
+    // which already has the correct logic for most data types
+    try {
+      lit.sql
+    } catch {
+      case _: MatchError =>
+        // Fallback to manual conversion for cases where Literal.sql doesn't 
have
+        // a pattern match for the specific (value, dataType) combination
+        lit match {
+          case Literal(null, _) => "NULL"
+          case Literal(value, dataType) => dataType match {
+            case ArrayType(elementType, _) => convertArrayLiteral(value, 
elementType)
+            case MapType(keyType, valueType, _) => convertMapLiteral(value, 
keyType, valueType)
+            case _: StructType =>
+              // Struct literals (row values) - convert to ROW constructor
+              value match {
+                case row: InternalRow =>
+                  val structType = dataType.asInstanceOf[StructType]
+                  val fieldValues = (0 until row.numFields).map { i =>
+                    if (row.isNullAt(i)) {
+                      "NULL"
+                    } else {
+                      val fieldType = structType.fields(i).dataType
+                      val fieldValue = row.get(i, fieldType)
+                      val fieldLiteral = Literal.create(fieldValue, fieldType)
+                      convert(fieldLiteral)
+                    }
+                  }
+                  s"ROW(${fieldValues.mkString(", ")})"
+                case _ => s"ROW(${value.toString})"
+              }
+            case _ =>
+              // For any other unsupported type, fall back to string 
representation
+              s"'${value.toString.replace("'", "''")}'"
+          }
+        }
+    }
+  }
+
+  private def convertArrayLiteral(value: Any, elementType: DataType): String = 
{
+    if (value == null) "NULL"
+    else {
+      // Handle both Scala collections and Spark's GenericArrayData
+      val arraySeq = value match {
+        case gad: GenericArrayData =>
+          gad.array.toSeq
+        case seq: scala.collection.Seq[Any] =>
+          seq
+        case arr: Array[Any] =>
+          arr.toSeq
+        case other =>
+          // Fallback: try to convert to array and then to sequence
+          Array(other).toSeq
+      }
+      val elementStrs = arraySeq.map { elem =>
+        convert(Literal.create(elem, elementType))
+      }
+      s"ARRAY(${elementStrs.mkString(", ")})"
+    }
+  }
+
+  private def convertMapLiteral(value: Any, keyType: DataType, valueType: 
DataType): String = {
+    if (value == null) "NULL"

Review Comment:
   ```suggestion
       if (value == null) {
         "NULL"
   ```



##########
sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkParserUtils.scala:
##########
@@ -187,25 +187,108 @@ trait SparkParserUtils {
    * Register the origin of the context. Any TreeNode created in the closure 
will be assigned the
    * registered origin. This method restores the previously set origin after 
completion of the
    * closure.
+   *
+   * This method is parameter substitution-aware. If parameter substitution 
occurred before
+   * parsing, it will automatically adjust the positions and SQL text to refer 
to the original SQL
+   * (before substitution) instead of the substituted SQL.
    */
   def withOrigin[T](ctx: ParserRuleContext, sqlText: Option[String] = None)(f: 
=> T): T = {
     val current = CurrentOrigin.get
     val text = sqlText.orElse(current.sqlText)
+
     if (text.isEmpty) {
       CurrentOrigin.set(position(ctx.getStart))
     } else {
-      CurrentOrigin.set(
-        positionAndText(
-          ctx.getStart,
-          ctx.getStop,
-          text.get,
-          current.objectType,
-          current.objectName))
+      // Check if parameter substitution occurred and adjust origin 
accordingly.
+      val adjustedOrigin = adjustOriginForParameterSubstitution(
+        ctx.getStart,
+        ctx.getStop,
+        text.get,
+        current.objectType,
+        current.objectName)
+
+      // Preserve any existing substitution info when setting the new origin.
+      val finalOrigin = if (current.parameterSubstitutionInfo.isDefined) {
+        adjustedOrigin.copy(parameterSubstitutionInfo = 
current.parameterSubstitutionInfo)
+      } else {
+        adjustedOrigin
+      }
+
+      CurrentOrigin.set(finalOrigin)
     }
     try {
       f
     } finally {
-      CurrentOrigin.set(current)
+      // When restoring origin, only preserve substitution info if it was 
already present
+      // in the original context. This prevents contamination across unrelated 
parsing operations.
+      val originToRestore = if (current.parameterSubstitutionInfo.isDefined) {
+        // Original context had substitution info - preserve it.
+        current
+      } else {
+        // Original context had no substitution info - don't add any.
+        current

Review Comment:
   this is the same as the other branch of the if statement :) we can just 
delete L222-230 and return `CurrentOrigin.set(current)`. You can keep the 
comments if desired.



##########
sql/api/src/main/scala/org/apache/spark/sql/catalyst/parser/SubstituteParmsAstBuilder.scala:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.parser
+
+import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode}
+
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
+import org.apache.spark.sql.catalyst.util.SparkParserUtils.withOrigin
+
+/**
+ * AST builder for extracting parameter markers and their locations from SQL 
parse trees. This
+ * builder traverses the parse tree and collects parameter information for 
substitution.
+ */
+class SubstituteParmsAstBuilder extends SqlBaseParserBaseVisitor[AnyRef] {
+
+  private val namedParams = scala.collection.mutable.Set[String]()
+  private val positionalParams = scala.collection.mutable.ListBuffer[Int]()
+  private val namedParamLocations =
+    scala.collection.mutable.Map[String, 
scala.collection.mutable.ListBuffer[ParameterLocation]]()
+  private val positionalParamLocations = 
scala.collection.mutable.ListBuffer[ParameterLocation]()
+
+  /**
+   * Extract parameter location information from a parse context. This method 
traverses the parse
+   * tree and collects parameter locations for substitution.
+   */
+  def extractParameterLocations(ctx: ParseTree): ParameterLocationInfo = {
+    // Clear previous state
+    namedParams.clear()
+    positionalParams.clear()
+    namedParamLocations.clear()
+    positionalParamLocations.clear()
+
+    // Visit the context to collect parameters and their locations
+    visit(ctx)
+
+    ParameterLocationInfo(
+      namedParamLocations.view.mapValues(_.toList).toMap,
+      positionalParamLocations.toList)
+  }
+
+  /**
+   * Collect information about a named parameter in a literal context. Note: 
The return value is
+   * not used; this method operates via side effects.
+   */
+  override def visitNamedParameterLiteral(ctx: NamedParameterLiteralContext): 
AnyRef =
+    withOrigin(ctx) {
+      val paramName = ctx.namedParameterMarker().identifier().getText
+      namedParams += paramName
+
+      // Calculate the location of the entire parameter (including the colon)
+      val startIndex = ctx.getStart.getStartIndex
+      val stopIndex = ctx.getStop.getStopIndex + 1
+      val locations = namedParamLocations.getOrElseUpdate(
+        paramName,
+        scala.collection.mutable.ListBuffer[ParameterLocation]())
+      locations += ParameterLocation(startIndex, stopIndex)
+
+      null // Return value not used
+    }
+
+  /**
+   * Collect information about a positional parameter in a literal context. 
Note: The return value
+   * is not used; this method operates via side effects.
+   */
+  override def visitPosParameterLiteral(ctx: PosParameterLiteralContext): 
AnyRef =
+    withOrigin(ctx) {
+      val startIndex = ctx.QUESTION().getSymbol.getStartIndex
+      positionalParams += startIndex
+
+      // Question mark is single character, so stopIndex = startIndex + 1
+      val stopIndex = startIndex + 1
+      positionalParamLocations += ParameterLocation(startIndex, stopIndex)
+
+      null // Return value not used
+    }
+
+  /**
+   * Collect information about named parameter markers. This handles the 
namedParameterMarker case
+   * in the shared parameterMarker grammar rule. Note: The return value is not 
used; this method
+   * operates via side effects.
+   */
+  override def visitNamedParameterMarkerRule(ctx: 
NamedParameterMarkerRuleContext): AnyRef =
+    withOrigin(ctx) {
+      val paramName = ctx.namedParameterMarker().identifier().getText
+      namedParams += paramName
+
+      // Calculate the location of the entire parameter (including the colon)
+      val startIndex = ctx.getStart.getStartIndex
+      val stopIndex = ctx.getStop.getStopIndex + 1
+      val locations = namedParamLocations.getOrElseUpdate(
+        paramName,
+        scala.collection.mutable.ListBuffer[ParameterLocation]())
+      locations += ParameterLocation(startIndex, stopIndex)
+
+      null // Return value not used
+    }
+
+  /**
+   * Collect information about positional parameter markers. This handles the 
QUESTION case in the
+   * shared parameterMarker grammar rule. Note: The return value is not used; 
this method operates
+   * via side effects.
+   */
+  override def visitPositionalParameterMarkerRule(
+      ctx: PositionalParameterMarkerRuleContext): AnyRef =
+    withOrigin(ctx) {
+      val paramIndex = positionalParams.size
+      positionalParams += paramIndex
+
+      // Parameter marker is single character, so stopIndex = startIndex + 1
+      val startIndex = ctx.getStart.getStartIndex
+      val stopIndex = startIndex + 1
+      positionalParamLocations += ParameterLocation(startIndex, stopIndex)
+
+      null // Return value not used
+    }
+
+  /**
+   * Override visit to ensure we traverse all children to find parameters.
+   */
+  override def visit(tree: ParseTree): AnyRef = {
+    if (tree == null) return null
+
+    // Check if this is a parameter literal
+    tree match {
+      case ctx: NamedParameterLiteralContext =>
+        visitNamedParameterLiteral(ctx)
+      case ctx: PosParameterLiteralContext =>
+        visitPosParameterLiteral(ctx)
+      case ctx: ParameterStringValueContext =>
+        // Handle parameter markers in string contexts
+        visit(ctx.parameterMarker())
+      case ctx: ParameterIntegerValueContext =>
+        // Handle parameter markers in integer contexts
+        visit(ctx.parameterMarker())
+      case ctx: NamedParameterMarkerRuleContext =>
+        visitNamedParameterMarkerRule(ctx)
+      case ctx: PositionalParameterMarkerRuleContext =>
+        visitPositionalParameterMarkerRule(ctx)
+      case ctx: StringLiteralInContextContext =>
+        // For string literals in context, continue traversing to find any 
nested parameters
+        visitChildren(ctx)
+      case ruleNode: RuleNode =>
+        // Continue traversing children for rule nodes
+        visitChildren(ruleNode)
+      case _ =>
+        // For other types (like terminal nodes), don't traverse children
+        null
+    }
+  }
+
+  /**
+   * Visit all children of a node to find parameters.
+   */
+  override def visitChildren(node: RuleNode): AnyRef = {
+    if (node == null) return null

Review Comment:
   ```suggestion
       if (node == null) {
         return null
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to