allisonwang-db commented on a change in pull request #31052:
URL: https://github.com/apache/spark/pull/31052#discussion_r553653569



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala
##########
@@ -662,4 +662,62 @@ object QueryCompilationErrors {
   def cannotReadCorruptedTablePropertyError(key: String, details: String = 
""): Throwable = {
     new AnalysisException(s"Cannot read table property '$key' as it's 
corrupted.$details")
   }
+
+  def invalidSchemaStringError(exp: Expression): Throwable = {
+    new AnalysisException(s"The expression '${exp.sql}' is not a valid schema 
string.")
+  }
+
+  def schemaMustBeFoldableError(exp: Expression): Throwable = {

Review comment:
       schemaNotFoldableError

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala
##########
@@ -662,4 +662,62 @@ object QueryCompilationErrors {
   def cannotReadCorruptedTablePropertyError(key: String, details: String = 
""): Throwable = {
     new AnalysisException(s"Cannot read table property '$key' as it's 
corrupted.$details")
   }
+
+  def invalidSchemaStringError(exp: Expression): Throwable = {
+    new AnalysisException(s"The expression '${exp.sql}' is not a valid schema 
string.")
+  }
+
+  def schemaMustBeFoldableError(exp: Expression): Throwable = {
+    new AnalysisException(
+      "Schema should be specified in DDL format as a string literal or output 
of " +
+        s"the schema_of_json/schema_of_csv functions instead of ${exp.sql}")
+  }
+
+  def schemaIsNotStructTypeError(dataType: DataType): Throwable = {
+    new AnalysisException(s"Schema should be struct type but got 
${dataType.sql}.")
+  }
+
+  def keyValueInMapNotExpectedError(m: CreateMap): Throwable = {
+    new AnalysisException(
+      s"A type of keys and values in map() must be string, but got 
${m.dataType.catalogString}")
+  }
+
+  def nonMapFunctionNotAllowedError(): Throwable = {
+    new AnalysisException("Must use a map() function for options")
+  }
+
+  def fieldTypeForCorruptRecordNotAllowedError(): Throwable = {

Review comment:
       invalidFieldTypeForCorruptRecordError

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala
##########
@@ -662,4 +662,62 @@ object QueryCompilationErrors {
   def cannotReadCorruptedTablePropertyError(key: String, details: String = 
""): Throwable = {
     new AnalysisException(s"Cannot read table property '$key' as it's 
corrupted.$details")
   }
+
+  def invalidSchemaStringError(exp: Expression): Throwable = {
+    new AnalysisException(s"The expression '${exp.sql}' is not a valid schema 
string.")
+  }
+
+  def schemaMustBeFoldableError(exp: Expression): Throwable = {
+    new AnalysisException(
+      "Schema should be specified in DDL format as a string literal or output 
of " +
+        s"the schema_of_json/schema_of_csv functions instead of ${exp.sql}")
+  }
+
+  def schemaIsNotStructTypeError(dataType: DataType): Throwable = {
+    new AnalysisException(s"Schema should be struct type but got 
${dataType.sql}.")
+  }
+
+  def keyValueInMapNotExpectedError(m: CreateMap): Throwable = {

Review comment:
       keyValueInMapNotStringError(dataType: DataType)

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala
##########
@@ -662,4 +662,62 @@ object QueryCompilationErrors {
   def cannotReadCorruptedTablePropertyError(key: String, details: String = 
""): Throwable = {
     new AnalysisException(s"Cannot read table property '$key' as it's 
corrupted.$details")
   }
+
+  def invalidSchemaStringError(exp: Expression): Throwable = {
+    new AnalysisException(s"The expression '${exp.sql}' is not a valid schema 
string.")
+  }
+
+  def schemaMustBeFoldableError(exp: Expression): Throwable = {
+    new AnalysisException(
+      "Schema should be specified in DDL format as a string literal or output 
of " +
+        s"the schema_of_json/schema_of_csv functions instead of ${exp.sql}")
+  }
+
+  def schemaIsNotStructTypeError(dataType: DataType): Throwable = {
+    new AnalysisException(s"Schema should be struct type but got 
${dataType.sql}.")
+  }
+
+  def keyValueInMapNotExpectedError(m: CreateMap): Throwable = {
+    new AnalysisException(
+      s"A type of keys and values in map() must be string, but got 
${m.dataType.catalogString}")
+  }
+
+  def nonMapFunctionNotAllowedError(): Throwable = {
+    new AnalysisException("Must use a map() function for options")
+  }
+
+  def fieldTypeForCorruptRecordNotAllowedError(): Throwable = {
+    new AnalysisException("The field for corrupt records must be string type 
and nullable")
+  }
+
+  def dataTypeUnsupportedByClassError(x: DataType, className: String): 
Throwable = {
+    new AnalysisException(s"DataType '$x' is not supported by $className.")
+  }
+
+  def parseModeUnsupportedError(funcName: String, mode: ParseMode): Throwable 
= {
+    new AnalysisException(s"$funcName() doesn't support the ${mode.name} mode. 
" +
+      s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}.")
+  }
+
+  def unfoldableFieldUnsupportedError(): Throwable = {
+    new AnalysisException("The field parameter needs to be a foldable string 
value.")
+  }
+
+  def literalOfTypeUnsupportedForSourceTypeError(field: String, source: 
Expression): Throwable = {

Review comment:
       literalTypeUnsupportedForSourceTypeError

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala
##########
@@ -662,4 +662,62 @@ object QueryCompilationErrors {
   def cannotReadCorruptedTablePropertyError(key: String, details: String = 
""): Throwable = {
     new AnalysisException(s"Cannot read table property '$key' as it's 
corrupted.$details")
   }
+
+  def invalidSchemaStringError(exp: Expression): Throwable = {
+    new AnalysisException(s"The expression '${exp.sql}' is not a valid schema 
string.")
+  }
+
+  def schemaMustBeFoldableError(exp: Expression): Throwable = {
+    new AnalysisException(
+      "Schema should be specified in DDL format as a string literal or output 
of " +
+        s"the schema_of_json/schema_of_csv functions instead of ${exp.sql}")
+  }
+
+  def schemaIsNotStructTypeError(dataType: DataType): Throwable = {
+    new AnalysisException(s"Schema should be struct type but got 
${dataType.sql}.")
+  }
+
+  def keyValueInMapNotExpectedError(m: CreateMap): Throwable = {
+    new AnalysisException(
+      s"A type of keys and values in map() must be string, but got 
${m.dataType.catalogString}")
+  }
+
+  def nonMapFunctionNotAllowedError(): Throwable = {
+    new AnalysisException("Must use a map() function for options")
+  }
+
+  def fieldTypeForCorruptRecordNotAllowedError(): Throwable = {
+    new AnalysisException("The field for corrupt records must be string type 
and nullable")
+  }
+
+  def dataTypeUnsupportedByClassError(x: DataType, className: String): 
Throwable = {
+    new AnalysisException(s"DataType '$x' is not supported by $className.")
+  }
+
+  def parseModeUnsupportedError(funcName: String, mode: ParseMode): Throwable 
= {
+    new AnalysisException(s"$funcName() doesn't support the ${mode.name} mode. 
" +
+      s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}.")
+  }
+
+  def unfoldableFieldUnsupportedError(): Throwable = {
+    new AnalysisException("The field parameter needs to be a foldable string 
value.")
+  }
+
+  def literalOfTypeUnsupportedForSourceTypeError(field: String, source: 
Expression): Throwable = {
+    new AnalysisException(s"Literals of type '$field' are currently not 
supported " +
+      s"for the ${source.dataType.catalogString} type.")
+  }
+
+  def componentTypeUnsupportedInArrayError(clz: Class[_]): Throwable = {
+    new AnalysisException(s"Unsupported component type $clz in arrays")
+  }
+
+  def invalidArgumentNumberForDecodeError(params: Seq[Expression]): Throwable 
= {
+    new AnalysisException("Invalid number of arguments for function decode. " +
+      s"Expected: 2; Found: ${params.length}")
+  }
+
+  def secondArgumentNotExpectedDoubleLiteralError(): Throwable = {

Review comment:
       secondArgumentNotDoubleLiteralError

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/QueryExecutionErrors.scala
##########
@@ -51,18 +58,209 @@ object QueryExecutionErrors {
       s"[BUG] logical plan should not have output of char/varchar type: $leaf")
   }
 
-  def cannotEvaluateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
-    new UnsupportedOperationException(s"Cannot evaluate expression: 
$generator")
+  def cannotEvaluateExpressionError(expression: Expression): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate expression: 
$expression")
   }
 
-  def cannotGenerateCodeForGeneratorError(generator: UnresolvedGenerator): 
Throwable = {
-    new UnsupportedOperationException(s"Cannot generate code for expression: 
$generator")
+  def cannotGenerateCodeForExpressionError(expression: Expression): Throwable 
= {
+    new UnsupportedOperationException(s"Cannot generate code for expression: 
$expression")
   }
 
   def cannotTerminateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
     new UnsupportedOperationException(s"Cannot terminate expression: 
$generator")
   }
 
+  def castDownLongCauseOverflowError(t: Any, targetType: String): Throwable = {
+    new ArithmeticException(s"Casting $t to $targetType causes overflow")
+  }
+
+  def cannotRepresentDecimalError(value: Decimal, decimalType: DecimalType): 
Throwable = {

Review comment:
       From the context, it seems the error is from unable to change precision. 
Maybe `cannotChangeDecimalPrecisionError`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/QueryExecutionErrors.scala
##########
@@ -51,18 +58,209 @@ object QueryExecutionErrors {
       s"[BUG] logical plan should not have output of char/varchar type: $leaf")
   }
 
-  def cannotEvaluateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
-    new UnsupportedOperationException(s"Cannot evaluate expression: 
$generator")
+  def cannotEvaluateExpressionError(expression: Expression): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate expression: 
$expression")
   }
 
-  def cannotGenerateCodeForGeneratorError(generator: UnresolvedGenerator): 
Throwable = {
-    new UnsupportedOperationException(s"Cannot generate code for expression: 
$generator")
+  def cannotGenerateCodeForExpressionError(expression: Expression): Throwable 
= {

Review comment:
       Nice!

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/QueryExecutionErrors.scala
##########
@@ -51,18 +58,209 @@ object QueryExecutionErrors {
       s"[BUG] logical plan should not have output of char/varchar type: $leaf")
   }
 
-  def cannotEvaluateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
-    new UnsupportedOperationException(s"Cannot evaluate expression: 
$generator")
+  def cannotEvaluateExpressionError(expression: Expression): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate expression: 
$expression")
   }
 
-  def cannotGenerateCodeForGeneratorError(generator: UnresolvedGenerator): 
Throwable = {
-    new UnsupportedOperationException(s"Cannot generate code for expression: 
$generator")
+  def cannotGenerateCodeForExpressionError(expression: Expression): Throwable 
= {
+    new UnsupportedOperationException(s"Cannot generate code for expression: 
$expression")
   }
 
   def cannotTerminateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
     new UnsupportedOperationException(s"Cannot terminate expression: 
$generator")
   }
 
+  def castDownLongCauseOverflowError(t: Any, targetType: String): Throwable = {
+    new ArithmeticException(s"Casting $t to $targetType causes overflow")
+  }
+
+  def cannotRepresentDecimalError(value: Decimal, decimalType: DecimalType): 
Throwable = {
+    new ArithmeticException(s"${value.toDebugString} cannot be represented as 
" +
+      s"Decimal(${decimalType.precision}, ${decimalType.scale}).")
+  }
+
+  def invalidInputSyntaxError(s: UTF8String): Throwable = {

Review comment:
       invalidInputSyntaxForNumericError

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/QueryExecutionErrors.scala
##########
@@ -51,18 +58,209 @@ object QueryExecutionErrors {
       s"[BUG] logical plan should not have output of char/varchar type: $leaf")
   }
 
-  def cannotEvaluateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
-    new UnsupportedOperationException(s"Cannot evaluate expression: 
$generator")
+  def cannotEvaluateExpressionError(expression: Expression): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate expression: 
$expression")
   }
 
-  def cannotGenerateCodeForGeneratorError(generator: UnresolvedGenerator): 
Throwable = {
-    new UnsupportedOperationException(s"Cannot generate code for expression: 
$generator")
+  def cannotGenerateCodeForExpressionError(expression: Expression): Throwable 
= {
+    new UnsupportedOperationException(s"Cannot generate code for expression: 
$expression")
   }
 
   def cannotTerminateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
     new UnsupportedOperationException(s"Cannot terminate expression: 
$generator")
   }
 
+  def castDownLongCauseOverflowError(t: Any, targetType: String): Throwable = {

Review comment:
       Since t's type is `Any` here, we can maybe name this as 
`castingCauseOverflowError`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/QueryExecutionErrors.scala
##########
@@ -51,18 +58,209 @@ object QueryExecutionErrors {
       s"[BUG] logical plan should not have output of char/varchar type: $leaf")
   }
 
-  def cannotEvaluateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
-    new UnsupportedOperationException(s"Cannot evaluate expression: 
$generator")
+  def cannotEvaluateExpressionError(expression: Expression): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate expression: 
$expression")
   }
 
-  def cannotGenerateCodeForGeneratorError(generator: UnresolvedGenerator): 
Throwable = {
-    new UnsupportedOperationException(s"Cannot generate code for expression: 
$generator")
+  def cannotGenerateCodeForExpressionError(expression: Expression): Throwable 
= {
+    new UnsupportedOperationException(s"Cannot generate code for expression: 
$expression")
   }
 
   def cannotTerminateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
     new UnsupportedOperationException(s"Cannot terminate expression: 
$generator")
   }
 
+  def castDownLongCauseOverflowError(t: Any, targetType: String): Throwable = {
+    new ArithmeticException(s"Casting $t to $targetType causes overflow")
+  }
+
+  def cannotRepresentDecimalError(value: Decimal, decimalType: DecimalType): 
Throwable = {
+    new ArithmeticException(s"${value.toDebugString} cannot be represented as 
" +
+      s"Decimal(${decimalType.precision}, ${decimalType.scale}).")
+  }
+
+  def invalidInputSyntaxError(s: UTF8String): Throwable = {
+    new NumberFormatException(s"invalid input syntax for type numeric: $s")
+  }
+
+  def cannotCastFromNullTypeDirectlyError(to: DataType): Throwable = {

Review comment:
       cannotCastFromNullTypeError

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/QueryExecutionErrors.scala
##########
@@ -51,18 +58,209 @@ object QueryExecutionErrors {
       s"[BUG] logical plan should not have output of char/varchar type: $leaf")
   }
 
-  def cannotEvaluateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
-    new UnsupportedOperationException(s"Cannot evaluate expression: 
$generator")
+  def cannotEvaluateExpressionError(expression: Expression): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate expression: 
$expression")
   }
 
-  def cannotGenerateCodeForGeneratorError(generator: UnresolvedGenerator): 
Throwable = {
-    new UnsupportedOperationException(s"Cannot generate code for expression: 
$generator")
+  def cannotGenerateCodeForExpressionError(expression: Expression): Throwable 
= {
+    new UnsupportedOperationException(s"Cannot generate code for expression: 
$expression")
   }
 
   def cannotTerminateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
     new UnsupportedOperationException(s"Cannot terminate expression: 
$generator")
   }
 
+  def castDownLongCauseOverflowError(t: Any, targetType: String): Throwable = {
+    new ArithmeticException(s"Casting $t to $targetType causes overflow")
+  }
+
+  def cannotRepresentDecimalError(value: Decimal, decimalType: DecimalType): 
Throwable = {
+    new ArithmeticException(s"${value.toDebugString} cannot be represented as 
" +
+      s"Decimal(${decimalType.precision}, ${decimalType.scale}).")
+  }
+
+  def invalidInputSyntaxError(s: UTF8String): Throwable = {
+    new NumberFormatException(s"invalid input syntax for type numeric: $s")
+  }
+
+  def cannotCastFromNullTypeDirectlyError(to: DataType): Throwable = {
+    new SparkException(s"should not directly cast from NullType to $to.")
+  }
+
+  def cannotCastError(from: DataType, to: DataType): Throwable = {
+    new SparkException(s"Cannot cast $from to $to.")
+  }
+
+  def cannotParseDecimalError(): Throwable = {
+    new IllegalArgumentException("Cannot parse any decimal")
+  }
+
+  def simpleStringWithNodeIdUnsupportedError(nodeName: String): Throwable = {
+    new UnsupportedOperationException(s"$nodeName does not implement 
simpleStringWithNodeId")
+  }
+
+  def evaluateUnevaluableAggregateUnsupportedError(
+      methodName: String, unEvaluable: UnevaluableAggregate): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate $methodName: 
$unEvaluable")
+  }
+
+  def dataTypeUnsupportedError(dt: DataType): Throwable = {
+    new SparkException(s"Unsupported data type $dt")
+  }
+
+  def udfErrorMessage(funcCls: String, inputTypes: String, outputType: 
String): String = {
+    s"Failed to execute user defined function ($funcCls: ($inputTypes) => 
$outputType)"
+  }
+
+  def failedExecuteUserDefinedFunctionError(
+      funcCls: String, inputTypes: String, outputType: String, e: Exception): 
Throwable = {
+    new SparkException(udfErrorMessage(funcCls, inputTypes, outputType), e)
+  }

Review comment:
       ```scala
   def failedToExecuteUserDefinedFunctionError(msg: String, e: Exception): 
Throwable = {
   ...
   }
   ```

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
##########
@@ -1178,19 +1179,19 @@ case class ScalaUDF(
 
   private[this] val resultConverter = catalystConverter
 
-  lazy val udfErrorMessage = {
-    val funcCls = Utils.getSimpleName(function.getClass)
-    val inputTypes = children.map(_.dataType.catalogString).mkString(", ")
-    val outputType = dataType.catalogString
-    s"Failed to execute user defined function($funcCls: ($inputTypes) => 
$outputType)"
+  lazy val (funcCls, inputTypesString, outputType) = {
+    (Utils.getSimpleName(function.getClass),
+      children.map(_.dataType.catalogString).mkString(", "),
+      dataType.catalogString)
   }
 
   override def eval(input: InternalRow): Any = {
     val result = try {
       f(input)
     } catch {
       case e: Exception =>
-        throw new SparkException(udfErrorMessage, e)
+        throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError(
+          funcCls, inputTypesString, outputType, e)

Review comment:
       We don't need to change the `udfErrorMessage` here. We can just directly 
pass the message itself.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/QueryExecutionErrors.scala
##########
@@ -51,18 +58,209 @@ object QueryExecutionErrors {
       s"[BUG] logical plan should not have output of char/varchar type: $leaf")
   }
 
-  def cannotEvaluateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
-    new UnsupportedOperationException(s"Cannot evaluate expression: 
$generator")
+  def cannotEvaluateExpressionError(expression: Expression): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate expression: 
$expression")
   }
 
-  def cannotGenerateCodeForGeneratorError(generator: UnresolvedGenerator): 
Throwable = {
-    new UnsupportedOperationException(s"Cannot generate code for expression: 
$generator")
+  def cannotGenerateCodeForExpressionError(expression: Expression): Throwable 
= {
+    new UnsupportedOperationException(s"Cannot generate code for expression: 
$expression")
   }
 
   def cannotTerminateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
     new UnsupportedOperationException(s"Cannot terminate expression: 
$generator")
   }
 
+  def castDownLongCauseOverflowError(t: Any, targetType: String): Throwable = {
+    new ArithmeticException(s"Casting $t to $targetType causes overflow")
+  }
+
+  def cannotRepresentDecimalError(value: Decimal, decimalType: DecimalType): 
Throwable = {
+    new ArithmeticException(s"${value.toDebugString} cannot be represented as 
" +
+      s"Decimal(${decimalType.precision}, ${decimalType.scale}).")
+  }
+
+  def invalidInputSyntaxError(s: UTF8String): Throwable = {
+    new NumberFormatException(s"invalid input syntax for type numeric: $s")
+  }
+
+  def cannotCastFromNullTypeDirectlyError(to: DataType): Throwable = {
+    new SparkException(s"should not directly cast from NullType to $to.")
+  }
+
+  def cannotCastError(from: DataType, to: DataType): Throwable = {
+    new SparkException(s"Cannot cast $from to $to.")
+  }
+
+  def cannotParseDecimalError(): Throwable = {
+    new IllegalArgumentException("Cannot parse any decimal")
+  }
+
+  def simpleStringWithNodeIdUnsupportedError(nodeName: String): Throwable = {
+    new UnsupportedOperationException(s"$nodeName does not implement 
simpleStringWithNodeId")
+  }
+
+  def evaluateUnevaluableAggregateUnsupportedError(
+      methodName: String, unEvaluable: UnevaluableAggregate): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate $methodName: 
$unEvaluable")
+  }
+
+  def dataTypeUnsupportedError(dt: DataType): Throwable = {
+    new SparkException(s"Unsupported data type $dt")
+  }
+
+  def udfErrorMessage(funcCls: String, inputTypes: String, outputType: 
String): String = {
+    s"Failed to execute user defined function ($funcCls: ($inputTypes) => 
$outputType)"
+  }
+
+  def failedExecuteUserDefinedFunctionError(
+      funcCls: String, inputTypes: String, outputType: String, e: Exception): 
Throwable = {
+    new SparkException(udfErrorMessage(funcCls, inputTypes, outputType), e)
+  }
+
+  def divideByZeroError(): Throwable = {
+    new ArithmeticException("divide by zero")
+  }
+
+  def dataTypeUnsupportedByExtractValueError(
+      dataType: DataType, extraction: Expression, child: Expression): 
Throwable = {
+    val errorMsg = dataType match {
+      case StructType(_) =>
+        s"Field name should be String Literal, but it's $extraction"
+      case other =>
+        s"Can't extract value from $child: need struct type but got 
${other.catalogString}"
+    }
+    new AnalysisException(errorMsg)
+  }
+
+  def invalidArrayIndexError(index: Int, numElements: Int): Throwable = {
+    new ArrayIndexOutOfBoundsException(s"Invalid index: $index, numElements: 
$numElements")
+  }
+
+  def mapKeyNotExistsError(key: Any): Throwable = {

Review comment:
       mapKeyNotExistError

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/QueryExecutionErrors.scala
##########
@@ -51,18 +58,209 @@ object QueryExecutionErrors {
       s"[BUG] logical plan should not have output of char/varchar type: $leaf")
   }
 
-  def cannotEvaluateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
-    new UnsupportedOperationException(s"Cannot evaluate expression: 
$generator")
+  def cannotEvaluateExpressionError(expression: Expression): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate expression: 
$expression")
   }
 
-  def cannotGenerateCodeForGeneratorError(generator: UnresolvedGenerator): 
Throwable = {
-    new UnsupportedOperationException(s"Cannot generate code for expression: 
$generator")
+  def cannotGenerateCodeForExpressionError(expression: Expression): Throwable 
= {
+    new UnsupportedOperationException(s"Cannot generate code for expression: 
$expression")
   }
 
   def cannotTerminateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
     new UnsupportedOperationException(s"Cannot terminate expression: 
$generator")
   }
 
+  def castDownLongCauseOverflowError(t: Any, targetType: String): Throwable = {
+    new ArithmeticException(s"Casting $t to $targetType causes overflow")
+  }
+
+  def cannotRepresentDecimalError(value: Decimal, decimalType: DecimalType): 
Throwable = {
+    new ArithmeticException(s"${value.toDebugString} cannot be represented as 
" +
+      s"Decimal(${decimalType.precision}, ${decimalType.scale}).")
+  }
+
+  def invalidInputSyntaxError(s: UTF8String): Throwable = {
+    new NumberFormatException(s"invalid input syntax for type numeric: $s")
+  }
+
+  def cannotCastFromNullTypeDirectlyError(to: DataType): Throwable = {
+    new SparkException(s"should not directly cast from NullType to $to.")
+  }
+
+  def cannotCastError(from: DataType, to: DataType): Throwable = {
+    new SparkException(s"Cannot cast $from to $to.")
+  }
+
+  def cannotParseDecimalError(): Throwable = {
+    new IllegalArgumentException("Cannot parse any decimal")
+  }
+
+  def simpleStringWithNodeIdUnsupportedError(nodeName: String): Throwable = {
+    new UnsupportedOperationException(s"$nodeName does not implement 
simpleStringWithNodeId")
+  }
+
+  def evaluateUnevaluableAggregateUnsupportedError(
+      methodName: String, unEvaluable: UnevaluableAggregate): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate $methodName: 
$unEvaluable")
+  }
+
+  def dataTypeUnsupportedError(dt: DataType): Throwable = {
+    new SparkException(s"Unsupported data type $dt")
+  }
+
+  def udfErrorMessage(funcCls: String, inputTypes: String, outputType: 
String): String = {
+    s"Failed to execute user defined function ($funcCls: ($inputTypes) => 
$outputType)"
+  }
+
+  def failedExecuteUserDefinedFunctionError(
+      funcCls: String, inputTypes: String, outputType: String, e: Exception): 
Throwable = {
+    new SparkException(udfErrorMessage(funcCls, inputTypes, outputType), e)
+  }
+
+  def divideByZeroError(): Throwable = {
+    new ArithmeticException("divide by zero")
+  }
+
+  def dataTypeUnsupportedByExtractValueError(
+      dataType: DataType, extraction: Expression, child: Expression): 
Throwable = {
+    val errorMsg = dataType match {
+      case StructType(_) =>
+        s"Field name should be String Literal, but it's $extraction"
+      case other =>
+        s"Can't extract value from $child: need struct type but got 
${other.catalogString}"
+    }
+    new AnalysisException(errorMsg)

Review comment:
       This should be moved to QueryCompilationError?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/QueryExecutionErrors.scala
##########
@@ -51,18 +58,209 @@ object QueryExecutionErrors {
       s"[BUG] logical plan should not have output of char/varchar type: $leaf")
   }
 
-  def cannotEvaluateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
-    new UnsupportedOperationException(s"Cannot evaluate expression: 
$generator")
+  def cannotEvaluateExpressionError(expression: Expression): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate expression: 
$expression")
   }
 
-  def cannotGenerateCodeForGeneratorError(generator: UnresolvedGenerator): 
Throwable = {
-    new UnsupportedOperationException(s"Cannot generate code for expression: 
$generator")
+  def cannotGenerateCodeForExpressionError(expression: Expression): Throwable 
= {
+    new UnsupportedOperationException(s"Cannot generate code for expression: 
$expression")
   }
 
   def cannotTerminateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
     new UnsupportedOperationException(s"Cannot terminate expression: 
$generator")
   }
 
+  def castDownLongCauseOverflowError(t: Any, targetType: String): Throwable = {
+    new ArithmeticException(s"Casting $t to $targetType causes overflow")
+  }
+
+  def cannotRepresentDecimalError(value: Decimal, decimalType: DecimalType): 
Throwable = {
+    new ArithmeticException(s"${value.toDebugString} cannot be represented as 
" +
+      s"Decimal(${decimalType.precision}, ${decimalType.scale}).")
+  }
+
+  def invalidInputSyntaxError(s: UTF8String): Throwable = {
+    new NumberFormatException(s"invalid input syntax for type numeric: $s")
+  }
+
+  def cannotCastFromNullTypeDirectlyError(to: DataType): Throwable = {
+    new SparkException(s"should not directly cast from NullType to $to.")
+  }
+
+  def cannotCastError(from: DataType, to: DataType): Throwable = {
+    new SparkException(s"Cannot cast $from to $to.")
+  }
+
+  def cannotParseDecimalError(): Throwable = {
+    new IllegalArgumentException("Cannot parse any decimal")
+  }
+
+  def simpleStringWithNodeIdUnsupportedError(nodeName: String): Throwable = {
+    new UnsupportedOperationException(s"$nodeName does not implement 
simpleStringWithNodeId")
+  }
+
+  def evaluateUnevaluableAggregateUnsupportedError(
+      methodName: String, unEvaluable: UnevaluableAggregate): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate $methodName: 
$unEvaluable")
+  }
+
+  def dataTypeUnsupportedError(dt: DataType): Throwable = {
+    new SparkException(s"Unsupported data type $dt")
+  }
+
+  def udfErrorMessage(funcCls: String, inputTypes: String, outputType: 
String): String = {
+    s"Failed to execute user defined function ($funcCls: ($inputTypes) => 
$outputType)"
+  }
+
+  def failedExecuteUserDefinedFunctionError(
+      funcCls: String, inputTypes: String, outputType: String, e: Exception): 
Throwable = {
+    new SparkException(udfErrorMessage(funcCls, inputTypes, outputType), e)
+  }
+
+  def divideByZeroError(): Throwable = {
+    new ArithmeticException("divide by zero")
+  }
+
+  def dataTypeUnsupportedByExtractValueError(
+      dataType: DataType, extraction: Expression, child: Expression): 
Throwable = {
+    val errorMsg = dataType match {
+      case StructType(_) =>
+        s"Field name should be String Literal, but it's $extraction"
+      case other =>
+        s"Can't extract value from $child: need struct type but got 
${other.catalogString}"
+    }
+    new AnalysisException(errorMsg)
+  }
+
+  def invalidArrayIndexError(index: Int, numElements: Int): Throwable = {
+    new ArrayIndexOutOfBoundsException(s"Invalid index: $index, numElements: 
$numElements")
+  }
+
+  def mapKeyNotExistsError(key: Any): Throwable = {
+    new NoSuchElementException(s"Key $key does not exist.")
+  }
+
+  def rowFromCSVParserNotExpectedError(): Throwable = {
+    new IllegalArgumentException("Expected one row from CSV parser.")
+  }
+
+  def inputTypeUnsupportedError(dataType: DataType): Throwable = {
+    new IllegalArgumentException(s"Unsupported input type 
${dataType.catalogString}")
+  }
+
+  def invalidFractionOfSecondError(): Throwable = {
+    new DateTimeException("The fraction of sec must be zero. Valid range is 
[0, 60].")
+  }
+
+  def overflowInSumOfDecimalError(): Throwable = {
+    new ArithmeticException("Overflow in sum of decimals.")
+  }
+
+  def mapSizeExceedArraySizeWhenZipMapError(size: Int): Throwable = {
+    new RuntimeException(s"Unsuccessful try to zip maps with $size " +
+      "unique keys due to exceeding the array size limit " +
+      s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+  }
+
+  def copyNullFieldNotAllowedError(): Throwable = {
+    new IllegalStateException("Do not attempt to copy a null field")
+  }
+
+  def literalTypeUnsupportedError(v: Any): Throwable = {
+    new RuntimeException(s"Unsupported literal type ${v.getClass} $v")
+  }
+
+  def noDefaultForDataTypeError(dataType: DataType): Throwable = {
+    new RuntimeException(s"no default for type $dataType")
+  }
+
+  def doGenCodeOfAliasShouldNotBeCalledError(): Throwable = {
+    new IllegalStateException("Alias.doGenCode should not be called.")
+  }
+
+  def orderedOperationUnsupportedByDataTypeError(dataType: DataType): 
Throwable = {
+    new IllegalArgumentException(s"Type $dataType does not support ordered 
operations")
+  }
+
+  def indexOfMatchGroupLessThanZeroNotAllowedError(): Throwable = {
+    new IllegalArgumentException("The specified group index cannot be less 
than zero")
+  }
+
+  def groupIndexExceedMatchGroupCountNotAllowedError(
+      groupCount: Int, groupIndex: Int): Throwable = {
+    new IllegalArgumentException(
+      s"Regex group count is $groupCount, but the specified group index is 
$groupIndex")
+  }
+
+  def invalidUrlError(url: UTF8String, e: URISyntaxException): Throwable = {
+    new IllegalArgumentException(s"Find an invaild url string 
${url.toString}", e)
+  }
+
+  def dataTypeUnsupportedError(): Throwable = {

Review comment:
       I think this is better if we call it `unsupportedOperationError(msg: 
String)` because this is not a certain data type is not supported, but the 
operation to invoke the `dataType` method is not supported.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/QueryExecutionErrors.scala
##########
@@ -51,18 +58,209 @@ object QueryExecutionErrors {
       s"[BUG] logical plan should not have output of char/varchar type: $leaf")
   }
 
-  def cannotEvaluateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
-    new UnsupportedOperationException(s"Cannot evaluate expression: 
$generator")
+  def cannotEvaluateExpressionError(expression: Expression): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate expression: 
$expression")
   }
 
-  def cannotGenerateCodeForGeneratorError(generator: UnresolvedGenerator): 
Throwable = {
-    new UnsupportedOperationException(s"Cannot generate code for expression: 
$generator")
+  def cannotGenerateCodeForExpressionError(expression: Expression): Throwable 
= {
+    new UnsupportedOperationException(s"Cannot generate code for expression: 
$expression")
   }
 
   def cannotTerminateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
     new UnsupportedOperationException(s"Cannot terminate expression: 
$generator")
   }
 
+  def castDownLongCauseOverflowError(t: Any, targetType: String): Throwable = {
+    new ArithmeticException(s"Casting $t to $targetType causes overflow")
+  }
+
+  def cannotRepresentDecimalError(value: Decimal, decimalType: DecimalType): 
Throwable = {
+    new ArithmeticException(s"${value.toDebugString} cannot be represented as 
" +
+      s"Decimal(${decimalType.precision}, ${decimalType.scale}).")
+  }
+
+  def invalidInputSyntaxError(s: UTF8String): Throwable = {
+    new NumberFormatException(s"invalid input syntax for type numeric: $s")
+  }
+
+  def cannotCastFromNullTypeDirectlyError(to: DataType): Throwable = {
+    new SparkException(s"should not directly cast from NullType to $to.")
+  }
+
+  def cannotCastError(from: DataType, to: DataType): Throwable = {
+    new SparkException(s"Cannot cast $from to $to.")
+  }
+
+  def cannotParseDecimalError(): Throwable = {
+    new IllegalArgumentException("Cannot parse any decimal")
+  }
+
+  def simpleStringWithNodeIdUnsupportedError(nodeName: String): Throwable = {
+    new UnsupportedOperationException(s"$nodeName does not implement 
simpleStringWithNodeId")
+  }
+
+  def evaluateUnevaluableAggregateUnsupportedError(
+      methodName: String, unEvaluable: UnevaluableAggregate): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate $methodName: 
$unEvaluable")
+  }
+
+  def dataTypeUnsupportedError(dt: DataType): Throwable = {
+    new SparkException(s"Unsupported data type $dt")
+  }
+
+  def udfErrorMessage(funcCls: String, inputTypes: String, outputType: 
String): String = {
+    s"Failed to execute user defined function ($funcCls: ($inputTypes) => 
$outputType)"
+  }
+
+  def failedExecuteUserDefinedFunctionError(
+      funcCls: String, inputTypes: String, outputType: String, e: Exception): 
Throwable = {
+    new SparkException(udfErrorMessage(funcCls, inputTypes, outputType), e)
+  }
+
+  def divideByZeroError(): Throwable = {
+    new ArithmeticException("divide by zero")
+  }
+
+  def dataTypeUnsupportedByExtractValueError(
+      dataType: DataType, extraction: Expression, child: Expression): 
Throwable = {
+    val errorMsg = dataType match {
+      case StructType(_) =>
+        s"Field name should be String Literal, but it's $extraction"
+      case other =>
+        s"Can't extract value from $child: need struct type but got 
${other.catalogString}"
+    }
+    new AnalysisException(errorMsg)
+  }
+
+  def invalidArrayIndexError(index: Int, numElements: Int): Throwable = {
+    new ArrayIndexOutOfBoundsException(s"Invalid index: $index, numElements: 
$numElements")
+  }
+
+  def mapKeyNotExistsError(key: Any): Throwable = {
+    new NoSuchElementException(s"Key $key does not exist.")
+  }
+
+  def rowFromCSVParserNotExpectedError(): Throwable = {
+    new IllegalArgumentException("Expected one row from CSV parser.")
+  }
+
+  def inputTypeUnsupportedError(dataType: DataType): Throwable = {
+    new IllegalArgumentException(s"Unsupported input type 
${dataType.catalogString}")
+  }
+
+  def invalidFractionOfSecondError(): Throwable = {
+    new DateTimeException("The fraction of sec must be zero. Valid range is 
[0, 60].")
+  }
+
+  def overflowInSumOfDecimalError(): Throwable = {
+    new ArithmeticException("Overflow in sum of decimals.")
+  }
+
+  def mapSizeExceedArraySizeWhenZipMapError(size: Int): Throwable = {
+    new RuntimeException(s"Unsuccessful try to zip maps with $size " +
+      "unique keys due to exceeding the array size limit " +
+      s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+  }
+
+  def copyNullFieldNotAllowedError(): Throwable = {
+    new IllegalStateException("Do not attempt to copy a null field")
+  }
+
+  def literalTypeUnsupportedError(v: Any): Throwable = {
+    new RuntimeException(s"Unsupported literal type ${v.getClass} $v")
+  }
+
+  def noDefaultForDataTypeError(dataType: DataType): Throwable = {
+    new RuntimeException(s"no default for type $dataType")
+  }
+
+  def doGenCodeOfAliasShouldNotBeCalledError(): Throwable = {
+    new IllegalStateException("Alias.doGenCode should not be called.")
+  }
+
+  def orderedOperationUnsupportedByDataTypeError(dataType: DataType): 
Throwable = {
+    new IllegalArgumentException(s"Type $dataType does not support ordered 
operations")
+  }
+
+  def indexOfMatchGroupLessThanZeroNotAllowedError(): Throwable = {

Review comment:
       regexGroupIndexLessThanZeroError

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/QueryExecutionErrors.scala
##########
@@ -51,18 +58,209 @@ object QueryExecutionErrors {
       s"[BUG] logical plan should not have output of char/varchar type: $leaf")
   }
 
-  def cannotEvaluateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
-    new UnsupportedOperationException(s"Cannot evaluate expression: 
$generator")
+  def cannotEvaluateExpressionError(expression: Expression): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate expression: 
$expression")
   }
 
-  def cannotGenerateCodeForGeneratorError(generator: UnresolvedGenerator): 
Throwable = {
-    new UnsupportedOperationException(s"Cannot generate code for expression: 
$generator")
+  def cannotGenerateCodeForExpressionError(expression: Expression): Throwable 
= {
+    new UnsupportedOperationException(s"Cannot generate code for expression: 
$expression")
   }
 
   def cannotTerminateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
     new UnsupportedOperationException(s"Cannot terminate expression: 
$generator")
   }
 
+  def castDownLongCauseOverflowError(t: Any, targetType: String): Throwable = {
+    new ArithmeticException(s"Casting $t to $targetType causes overflow")
+  }
+
+  def cannotRepresentDecimalError(value: Decimal, decimalType: DecimalType): 
Throwable = {
+    new ArithmeticException(s"${value.toDebugString} cannot be represented as 
" +
+      s"Decimal(${decimalType.precision}, ${decimalType.scale}).")
+  }
+
+  def invalidInputSyntaxError(s: UTF8String): Throwable = {
+    new NumberFormatException(s"invalid input syntax for type numeric: $s")
+  }
+
+  def cannotCastFromNullTypeDirectlyError(to: DataType): Throwable = {
+    new SparkException(s"should not directly cast from NullType to $to.")
+  }
+
+  def cannotCastError(from: DataType, to: DataType): Throwable = {
+    new SparkException(s"Cannot cast $from to $to.")
+  }
+
+  def cannotParseDecimalError(): Throwable = {
+    new IllegalArgumentException("Cannot parse any decimal")
+  }
+
+  def simpleStringWithNodeIdUnsupportedError(nodeName: String): Throwable = {
+    new UnsupportedOperationException(s"$nodeName does not implement 
simpleStringWithNodeId")
+  }
+
+  def evaluateUnevaluableAggregateUnsupportedError(
+      methodName: String, unEvaluable: UnevaluableAggregate): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate $methodName: 
$unEvaluable")
+  }
+
+  def dataTypeUnsupportedError(dt: DataType): Throwable = {
+    new SparkException(s"Unsupported data type $dt")
+  }
+
+  def udfErrorMessage(funcCls: String, inputTypes: String, outputType: 
String): String = {
+    s"Failed to execute user defined function ($funcCls: ($inputTypes) => 
$outputType)"
+  }
+
+  def failedExecuteUserDefinedFunctionError(
+      funcCls: String, inputTypes: String, outputType: String, e: Exception): 
Throwable = {
+    new SparkException(udfErrorMessage(funcCls, inputTypes, outputType), e)
+  }
+
+  def divideByZeroError(): Throwable = {
+    new ArithmeticException("divide by zero")
+  }
+
+  def dataTypeUnsupportedByExtractValueError(
+      dataType: DataType, extraction: Expression, child: Expression): 
Throwable = {
+    val errorMsg = dataType match {
+      case StructType(_) =>
+        s"Field name should be String Literal, but it's $extraction"
+      case other =>
+        s"Can't extract value from $child: need struct type but got 
${other.catalogString}"
+    }
+    new AnalysisException(errorMsg)
+  }
+
+  def invalidArrayIndexError(index: Int, numElements: Int): Throwable = {
+    new ArrayIndexOutOfBoundsException(s"Invalid index: $index, numElements: 
$numElements")
+  }
+
+  def mapKeyNotExistsError(key: Any): Throwable = {
+    new NoSuchElementException(s"Key $key does not exist.")
+  }
+
+  def rowFromCSVParserNotExpectedError(): Throwable = {
+    new IllegalArgumentException("Expected one row from CSV parser.")
+  }
+
+  def inputTypeUnsupportedError(dataType: DataType): Throwable = {
+    new IllegalArgumentException(s"Unsupported input type 
${dataType.catalogString}")
+  }
+
+  def invalidFractionOfSecondError(): Throwable = {
+    new DateTimeException("The fraction of sec must be zero. Valid range is 
[0, 60].")
+  }
+
+  def overflowInSumOfDecimalError(): Throwable = {
+    new ArithmeticException("Overflow in sum of decimals.")
+  }
+
+  def mapSizeExceedArraySizeWhenZipMapError(size: Int): Throwable = {
+    new RuntimeException(s"Unsuccessful try to zip maps with $size " +
+      "unique keys due to exceeding the array size limit " +
+      s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+  }
+
+  def copyNullFieldNotAllowedError(): Throwable = {
+    new IllegalStateException("Do not attempt to copy a null field")
+  }
+
+  def literalTypeUnsupportedError(v: Any): Throwable = {
+    new RuntimeException(s"Unsupported literal type ${v.getClass} $v")
+  }
+
+  def noDefaultForDataTypeError(dataType: DataType): Throwable = {
+    new RuntimeException(s"no default for type $dataType")
+  }
+
+  def doGenCodeOfAliasShouldNotBeCalledError(): Throwable = {
+    new IllegalStateException("Alias.doGenCode should not be called.")
+  }
+
+  def orderedOperationUnsupportedByDataTypeError(dataType: DataType): 
Throwable = {
+    new IllegalArgumentException(s"Type $dataType does not support ordered 
operations")
+  }
+
+  def indexOfMatchGroupLessThanZeroNotAllowedError(): Throwable = {
+    new IllegalArgumentException("The specified group index cannot be less 
than zero")
+  }
+
+  def groupIndexExceedMatchGroupCountNotAllowedError(
+      groupCount: Int, groupIndex: Int): Throwable = {
+    new IllegalArgumentException(
+      s"Regex group count is $groupCount, but the specified group index is 
$groupIndex")
+  }
+
+  def invalidUrlError(url: UTF8String, e: URISyntaxException): Throwable = {
+    new IllegalArgumentException(s"Find an invaild url string 
${url.toString}", e)
+  }
+
+  def dataTypeUnsupportedError(): Throwable = {
+    new UnsupportedOperationException("dataType")
+  }
+
+  def mergeUnsupportedByWindowFunctionError(): Throwable = {
+    new UnsupportedOperationException("Window Functions do not support 
merging.")
+  }
+
+  def dataTypeUnexpectedError(dataType: DataType): Throwable = {
+    new UnsupportedOperationException(s"Unexpected data type 
${dataType.catalogString}")
+  }
+
+  def negativeValueUnexpectedError(frequencyExpression : Expression): 
Throwable = {
+    new SparkException(s"Negative values found in ${frequencyExpression.sql}")
+  }
+
+  def addNewFunctionMismatchedWithFunctionError(funcName: String): Throwable = 
{
+    new IllegalArgumentException(s"$funcName is not matched at addNewFunction")
+  }
+
+  def cannotGenerateCodeForUnComparableTypeError(

Review comment:
       cannotGenerateCodeForUncomparableTypeError

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/QueryExecutionErrors.scala
##########
@@ -51,18 +58,209 @@ object QueryExecutionErrors {
       s"[BUG] logical plan should not have output of char/varchar type: $leaf")
   }
 
-  def cannotEvaluateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
-    new UnsupportedOperationException(s"Cannot evaluate expression: 
$generator")
+  def cannotEvaluateExpressionError(expression: Expression): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate expression: 
$expression")
   }
 
-  def cannotGenerateCodeForGeneratorError(generator: UnresolvedGenerator): 
Throwable = {
-    new UnsupportedOperationException(s"Cannot generate code for expression: 
$generator")
+  def cannotGenerateCodeForExpressionError(expression: Expression): Throwable 
= {
+    new UnsupportedOperationException(s"Cannot generate code for expression: 
$expression")
   }
 
   def cannotTerminateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
     new UnsupportedOperationException(s"Cannot terminate expression: 
$generator")
   }
 
+  def castDownLongCauseOverflowError(t: Any, targetType: String): Throwable = {
+    new ArithmeticException(s"Casting $t to $targetType causes overflow")
+  }
+
+  def cannotRepresentDecimalError(value: Decimal, decimalType: DecimalType): 
Throwable = {
+    new ArithmeticException(s"${value.toDebugString} cannot be represented as 
" +
+      s"Decimal(${decimalType.precision}, ${decimalType.scale}).")
+  }
+
+  def invalidInputSyntaxError(s: UTF8String): Throwable = {
+    new NumberFormatException(s"invalid input syntax for type numeric: $s")
+  }
+
+  def cannotCastFromNullTypeDirectlyError(to: DataType): Throwable = {
+    new SparkException(s"should not directly cast from NullType to $to.")
+  }
+
+  def cannotCastError(from: DataType, to: DataType): Throwable = {
+    new SparkException(s"Cannot cast $from to $to.")
+  }
+
+  def cannotParseDecimalError(): Throwable = {
+    new IllegalArgumentException("Cannot parse any decimal")
+  }
+
+  def simpleStringWithNodeIdUnsupportedError(nodeName: String): Throwable = {
+    new UnsupportedOperationException(s"$nodeName does not implement 
simpleStringWithNodeId")
+  }
+
+  def evaluateUnevaluableAggregateUnsupportedError(
+      methodName: String, unEvaluable: UnevaluableAggregate): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate $methodName: 
$unEvaluable")
+  }
+
+  def dataTypeUnsupportedError(dt: DataType): Throwable = {
+    new SparkException(s"Unsupported data type $dt")
+  }
+
+  def udfErrorMessage(funcCls: String, inputTypes: String, outputType: 
String): String = {
+    s"Failed to execute user defined function ($funcCls: ($inputTypes) => 
$outputType)"
+  }
+
+  def failedExecuteUserDefinedFunctionError(
+      funcCls: String, inputTypes: String, outputType: String, e: Exception): 
Throwable = {
+    new SparkException(udfErrorMessage(funcCls, inputTypes, outputType), e)
+  }
+
+  def divideByZeroError(): Throwable = {
+    new ArithmeticException("divide by zero")
+  }
+
+  def dataTypeUnsupportedByExtractValueError(
+      dataType: DataType, extraction: Expression, child: Expression): 
Throwable = {
+    val errorMsg = dataType match {
+      case StructType(_) =>
+        s"Field name should be String Literal, but it's $extraction"
+      case other =>
+        s"Can't extract value from $child: need struct type but got 
${other.catalogString}"
+    }
+    new AnalysisException(errorMsg)
+  }
+
+  def invalidArrayIndexError(index: Int, numElements: Int): Throwable = {
+    new ArrayIndexOutOfBoundsException(s"Invalid index: $index, numElements: 
$numElements")
+  }
+
+  def mapKeyNotExistsError(key: Any): Throwable = {
+    new NoSuchElementException(s"Key $key does not exist.")
+  }
+
+  def rowFromCSVParserNotExpectedError(): Throwable = {
+    new IllegalArgumentException("Expected one row from CSV parser.")
+  }
+
+  def inputTypeUnsupportedError(dataType: DataType): Throwable = {
+    new IllegalArgumentException(s"Unsupported input type 
${dataType.catalogString}")
+  }
+
+  def invalidFractionOfSecondError(): Throwable = {
+    new DateTimeException("The fraction of sec must be zero. Valid range is 
[0, 60].")
+  }
+
+  def overflowInSumOfDecimalError(): Throwable = {
+    new ArithmeticException("Overflow in sum of decimals.")
+  }
+
+  def mapSizeExceedArraySizeWhenZipMapError(size: Int): Throwable = {
+    new RuntimeException(s"Unsuccessful try to zip maps with $size " +
+      "unique keys due to exceeding the array size limit " +
+      s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+  }
+
+  def copyNullFieldNotAllowedError(): Throwable = {
+    new IllegalStateException("Do not attempt to copy a null field")
+  }
+
+  def literalTypeUnsupportedError(v: Any): Throwable = {
+    new RuntimeException(s"Unsupported literal type ${v.getClass} $v")
+  }
+
+  def noDefaultForDataTypeError(dataType: DataType): Throwable = {
+    new RuntimeException(s"no default for type $dataType")
+  }
+
+  def doGenCodeOfAliasShouldNotBeCalledError(): Throwable = {
+    new IllegalStateException("Alias.doGenCode should not be called.")
+  }
+
+  def orderedOperationUnsupportedByDataTypeError(dataType: DataType): 
Throwable = {
+    new IllegalArgumentException(s"Type $dataType does not support ordered 
operations")
+  }
+
+  def indexOfMatchGroupLessThanZeroNotAllowedError(): Throwable = {
+    new IllegalArgumentException("The specified group index cannot be less 
than zero")
+  }
+
+  def groupIndexExceedMatchGroupCountNotAllowedError(

Review comment:
       regexGroupIndexExceedGroupCountError

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/QueryExecutionErrors.scala
##########
@@ -51,18 +58,209 @@ object QueryExecutionErrors {
       s"[BUG] logical plan should not have output of char/varchar type: $leaf")
   }
 
-  def cannotEvaluateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
-    new UnsupportedOperationException(s"Cannot evaluate expression: 
$generator")
+  def cannotEvaluateExpressionError(expression: Expression): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate expression: 
$expression")
   }
 
-  def cannotGenerateCodeForGeneratorError(generator: UnresolvedGenerator): 
Throwable = {
-    new UnsupportedOperationException(s"Cannot generate code for expression: 
$generator")
+  def cannotGenerateCodeForExpressionError(expression: Expression): Throwable 
= {
+    new UnsupportedOperationException(s"Cannot generate code for expression: 
$expression")
   }
 
   def cannotTerminateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
     new UnsupportedOperationException(s"Cannot terminate expression: 
$generator")
   }
 
+  def castDownLongCauseOverflowError(t: Any, targetType: String): Throwable = {
+    new ArithmeticException(s"Casting $t to $targetType causes overflow")
+  }
+
+  def cannotRepresentDecimalError(value: Decimal, decimalType: DecimalType): 
Throwable = {
+    new ArithmeticException(s"${value.toDebugString} cannot be represented as 
" +
+      s"Decimal(${decimalType.precision}, ${decimalType.scale}).")
+  }
+
+  def invalidInputSyntaxError(s: UTF8String): Throwable = {
+    new NumberFormatException(s"invalid input syntax for type numeric: $s")
+  }
+
+  def cannotCastFromNullTypeDirectlyError(to: DataType): Throwable = {
+    new SparkException(s"should not directly cast from NullType to $to.")
+  }
+
+  def cannotCastError(from: DataType, to: DataType): Throwable = {
+    new SparkException(s"Cannot cast $from to $to.")
+  }
+
+  def cannotParseDecimalError(): Throwable = {
+    new IllegalArgumentException("Cannot parse any decimal")
+  }
+
+  def simpleStringWithNodeIdUnsupportedError(nodeName: String): Throwable = {
+    new UnsupportedOperationException(s"$nodeName does not implement 
simpleStringWithNodeId")
+  }
+
+  def evaluateUnevaluableAggregateUnsupportedError(
+      methodName: String, unEvaluable: UnevaluableAggregate): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate $methodName: 
$unEvaluable")
+  }
+
+  def dataTypeUnsupportedError(dt: DataType): Throwable = {
+    new SparkException(s"Unsupported data type $dt")
+  }
+
+  def udfErrorMessage(funcCls: String, inputTypes: String, outputType: 
String): String = {
+    s"Failed to execute user defined function ($funcCls: ($inputTypes) => 
$outputType)"
+  }
+
+  def failedExecuteUserDefinedFunctionError(
+      funcCls: String, inputTypes: String, outputType: String, e: Exception): 
Throwable = {
+    new SparkException(udfErrorMessage(funcCls, inputTypes, outputType), e)
+  }
+
+  def divideByZeroError(): Throwable = {
+    new ArithmeticException("divide by zero")
+  }
+
+  def dataTypeUnsupportedByExtractValueError(
+      dataType: DataType, extraction: Expression, child: Expression): 
Throwable = {
+    val errorMsg = dataType match {
+      case StructType(_) =>
+        s"Field name should be String Literal, but it's $extraction"
+      case other =>
+        s"Can't extract value from $child: need struct type but got 
${other.catalogString}"
+    }
+    new AnalysisException(errorMsg)
+  }
+
+  def invalidArrayIndexError(index: Int, numElements: Int): Throwable = {
+    new ArrayIndexOutOfBoundsException(s"Invalid index: $index, numElements: 
$numElements")
+  }
+
+  def mapKeyNotExistsError(key: Any): Throwable = {
+    new NoSuchElementException(s"Key $key does not exist.")
+  }
+
+  def rowFromCSVParserNotExpectedError(): Throwable = {
+    new IllegalArgumentException("Expected one row from CSV parser.")
+  }
+
+  def inputTypeUnsupportedError(dataType: DataType): Throwable = {
+    new IllegalArgumentException(s"Unsupported input type 
${dataType.catalogString}")
+  }
+
+  def invalidFractionOfSecondError(): Throwable = {
+    new DateTimeException("The fraction of sec must be zero. Valid range is 
[0, 60].")
+  }
+
+  def overflowInSumOfDecimalError(): Throwable = {
+    new ArithmeticException("Overflow in sum of decimals.")
+  }
+
+  def mapSizeExceedArraySizeWhenZipMapError(size: Int): Throwable = {
+    new RuntimeException(s"Unsuccessful try to zip maps with $size " +
+      "unique keys due to exceeding the array size limit " +
+      s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+  }
+
+  def copyNullFieldNotAllowedError(): Throwable = {
+    new IllegalStateException("Do not attempt to copy a null field")
+  }
+
+  def literalTypeUnsupportedError(v: Any): Throwable = {
+    new RuntimeException(s"Unsupported literal type ${v.getClass} $v")
+  }
+
+  def noDefaultForDataTypeError(dataType: DataType): Throwable = {
+    new RuntimeException(s"no default for type $dataType")
+  }
+
+  def doGenCodeOfAliasShouldNotBeCalledError(): Throwable = {
+    new IllegalStateException("Alias.doGenCode should not be called.")
+  }
+
+  def orderedOperationUnsupportedByDataTypeError(dataType: DataType): 
Throwable = {
+    new IllegalArgumentException(s"Type $dataType does not support ordered 
operations")
+  }
+
+  def indexOfMatchGroupLessThanZeroNotAllowedError(): Throwable = {
+    new IllegalArgumentException("The specified group index cannot be less 
than zero")
+  }
+
+  def groupIndexExceedMatchGroupCountNotAllowedError(
+      groupCount: Int, groupIndex: Int): Throwable = {
+    new IllegalArgumentException(
+      s"Regex group count is $groupCount, but the specified group index is 
$groupIndex")
+  }
+
+  def invalidUrlError(url: UTF8String, e: URISyntaxException): Throwable = {
+    new IllegalArgumentException(s"Find an invaild url string 
${url.toString}", e)
+  }
+
+  def dataTypeUnsupportedError(): Throwable = {
+    new UnsupportedOperationException("dataType")
+  }
+
+  def mergeUnsupportedByWindowFunctionError(): Throwable = {
+    new UnsupportedOperationException("Window Functions do not support 
merging.")
+  }
+
+  def dataTypeUnexpectedError(dataType: DataType): Throwable = {
+    new UnsupportedOperationException(s"Unexpected data type 
${dataType.catalogString}")
+  }
+
+  def negativeValueUnexpectedError(frequencyExpression : Expression): 
Throwable = {
+    new SparkException(s"Negative values found in ${frequencyExpression.sql}")
+  }
+
+  def addNewFunctionMismatchedWithFunctionError(funcName: String): Throwable = 
{
+    new IllegalArgumentException(s"$funcName is not matched at addNewFunction")
+  }
+
+  def cannotGenerateCodeForUnComparableTypeError(
+      codeType: String, dataType: DataType): Throwable = {
+    new IllegalArgumentException(
+      s"cannot generate $codeType code for un-comparable type: 
${dataType.catalogString}")
+  }
+
+  def cannotGenerateCodeForUnsupportedTypeError(dataType: DataType): Throwable 
= {
+    new IllegalArgumentException(s"cannot generate code for unsupported type: 
$dataType")
+  }
+
+  def cannotInterpolateClassIntoCodeBlockError(arg: Any): Throwable = {
+    new IllegalArgumentException(
+      s"Can not interpolate ${arg.getClass.getName} into code block.")
+  }
+
+  def customCollectionClsCannotResolvedError(): Throwable = {
+    new UnsupportedOperationException("not resolved")
+  }
+
+  def classUnsupportedByMapObjectError(cls: Class[_]): Throwable = {
+    new RuntimeException(s"class `${cls.getName}` is not supported by 
`MapObjects` as " +
+      "resulting collection.")
+  }
+
+  def nullAsMapKeyNotAllowedError(): Throwable = {
+    new RuntimeException("Cannot use null as map key!")
+  }
+
+  def methodNotDeclaredError(name: String): Throwable = {
+    new NoSuchMethodException(s"""A method named "$name" is not declared """ +
+      "in any enclosing class nor any supertype")
+  }
+
+  def inputExternalRowCannotBeNullError(): Throwable = {
+    new RuntimeException("The input external row cannot be null.")
+  }
+
+  def fieldOfInputRowCannotBeNullMsg(index: Int, fieldName: String): String = {
+    s"The ${index}th field '$fieldName' of input row cannot be null."
+  }
+
+  def fieldOfInputRowCannotBeNullError(index: Int, fieldName: String): 
Throwable = {

Review comment:
       fieldCannotBeNullError

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/QueryExecutionErrors.scala
##########
@@ -51,18 +58,209 @@ object QueryExecutionErrors {
       s"[BUG] logical plan should not have output of char/varchar type: $leaf")
   }
 
-  def cannotEvaluateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
-    new UnsupportedOperationException(s"Cannot evaluate expression: 
$generator")
+  def cannotEvaluateExpressionError(expression: Expression): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate expression: 
$expression")
   }
 
-  def cannotGenerateCodeForGeneratorError(generator: UnresolvedGenerator): 
Throwable = {
-    new UnsupportedOperationException(s"Cannot generate code for expression: 
$generator")
+  def cannotGenerateCodeForExpressionError(expression: Expression): Throwable 
= {
+    new UnsupportedOperationException(s"Cannot generate code for expression: 
$expression")
   }
 
   def cannotTerminateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
     new UnsupportedOperationException(s"Cannot terminate expression: 
$generator")
   }
 
+  def castDownLongCauseOverflowError(t: Any, targetType: String): Throwable = {
+    new ArithmeticException(s"Casting $t to $targetType causes overflow")
+  }
+
+  def cannotRepresentDecimalError(value: Decimal, decimalType: DecimalType): 
Throwable = {
+    new ArithmeticException(s"${value.toDebugString} cannot be represented as 
" +
+      s"Decimal(${decimalType.precision}, ${decimalType.scale}).")
+  }
+
+  def invalidInputSyntaxError(s: UTF8String): Throwable = {
+    new NumberFormatException(s"invalid input syntax for type numeric: $s")
+  }
+
+  def cannotCastFromNullTypeDirectlyError(to: DataType): Throwable = {
+    new SparkException(s"should not directly cast from NullType to $to.")
+  }
+
+  def cannotCastError(from: DataType, to: DataType): Throwable = {
+    new SparkException(s"Cannot cast $from to $to.")
+  }
+
+  def cannotParseDecimalError(): Throwable = {
+    new IllegalArgumentException("Cannot parse any decimal")
+  }
+
+  def simpleStringWithNodeIdUnsupportedError(nodeName: String): Throwable = {
+    new UnsupportedOperationException(s"$nodeName does not implement 
simpleStringWithNodeId")
+  }
+
+  def evaluateUnevaluableAggregateUnsupportedError(
+      methodName: String, unEvaluable: UnevaluableAggregate): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate $methodName: 
$unEvaluable")
+  }
+
+  def dataTypeUnsupportedError(dt: DataType): Throwable = {
+    new SparkException(s"Unsupported data type $dt")
+  }
+
+  def udfErrorMessage(funcCls: String, inputTypes: String, outputType: 
String): String = {
+    s"Failed to execute user defined function ($funcCls: ($inputTypes) => 
$outputType)"
+  }
+
+  def failedExecuteUserDefinedFunctionError(
+      funcCls: String, inputTypes: String, outputType: String, e: Exception): 
Throwable = {
+    new SparkException(udfErrorMessage(funcCls, inputTypes, outputType), e)
+  }
+
+  def divideByZeroError(): Throwable = {
+    new ArithmeticException("divide by zero")
+  }
+
+  def dataTypeUnsupportedByExtractValueError(
+      dataType: DataType, extraction: Expression, child: Expression): 
Throwable = {
+    val errorMsg = dataType match {
+      case StructType(_) =>
+        s"Field name should be String Literal, but it's $extraction"
+      case other =>
+        s"Can't extract value from $child: need struct type but got 
${other.catalogString}"
+    }
+    new AnalysisException(errorMsg)
+  }
+
+  def invalidArrayIndexError(index: Int, numElements: Int): Throwable = {
+    new ArrayIndexOutOfBoundsException(s"Invalid index: $index, numElements: 
$numElements")
+  }
+
+  def mapKeyNotExistsError(key: Any): Throwable = {
+    new NoSuchElementException(s"Key $key does not exist.")
+  }
+
+  def rowFromCSVParserNotExpectedError(): Throwable = {
+    new IllegalArgumentException("Expected one row from CSV parser.")
+  }
+
+  def inputTypeUnsupportedError(dataType: DataType): Throwable = {
+    new IllegalArgumentException(s"Unsupported input type 
${dataType.catalogString}")
+  }
+
+  def invalidFractionOfSecondError(): Throwable = {
+    new DateTimeException("The fraction of sec must be zero. Valid range is 
[0, 60].")
+  }
+
+  def overflowInSumOfDecimalError(): Throwable = {
+    new ArithmeticException("Overflow in sum of decimals.")
+  }
+
+  def mapSizeExceedArraySizeWhenZipMapError(size: Int): Throwable = {
+    new RuntimeException(s"Unsuccessful try to zip maps with $size " +
+      "unique keys due to exceeding the array size limit " +
+      s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+  }
+
+  def copyNullFieldNotAllowedError(): Throwable = {
+    new IllegalStateException("Do not attempt to copy a null field")
+  }
+
+  def literalTypeUnsupportedError(v: Any): Throwable = {
+    new RuntimeException(s"Unsupported literal type ${v.getClass} $v")
+  }
+
+  def noDefaultForDataTypeError(dataType: DataType): Throwable = {
+    new RuntimeException(s"no default for type $dataType")
+  }
+
+  def doGenCodeOfAliasShouldNotBeCalledError(): Throwable = {
+    new IllegalStateException("Alias.doGenCode should not be called.")
+  }
+
+  def orderedOperationUnsupportedByDataTypeError(dataType: DataType): 
Throwable = {
+    new IllegalArgumentException(s"Type $dataType does not support ordered 
operations")
+  }
+
+  def indexOfMatchGroupLessThanZeroNotAllowedError(): Throwable = {
+    new IllegalArgumentException("The specified group index cannot be less 
than zero")
+  }
+
+  def groupIndexExceedMatchGroupCountNotAllowedError(
+      groupCount: Int, groupIndex: Int): Throwable = {
+    new IllegalArgumentException(
+      s"Regex group count is $groupCount, but the specified group index is 
$groupIndex")
+  }
+
+  def invalidUrlError(url: UTF8String, e: URISyntaxException): Throwable = {
+    new IllegalArgumentException(s"Find an invaild url string 
${url.toString}", e)
+  }
+
+  def dataTypeUnsupportedError(): Throwable = {
+    new UnsupportedOperationException("dataType")
+  }
+
+  def mergeUnsupportedByWindowFunctionError(): Throwable = {
+    new UnsupportedOperationException("Window Functions do not support 
merging.")
+  }
+
+  def dataTypeUnexpectedError(dataType: DataType): Throwable = {
+    new UnsupportedOperationException(s"Unexpected data type 
${dataType.catalogString}")
+  }
+
+  def negativeValueUnexpectedError(frequencyExpression : Expression): 
Throwable = {
+    new SparkException(s"Negative values found in ${frequencyExpression.sql}")
+  }
+
+  def addNewFunctionMismatchedWithFunctionError(funcName: String): Throwable = 
{
+    new IllegalArgumentException(s"$funcName is not matched at addNewFunction")
+  }
+
+  def cannotGenerateCodeForUnComparableTypeError(
+      codeType: String, dataType: DataType): Throwable = {
+    new IllegalArgumentException(
+      s"cannot generate $codeType code for un-comparable type: 
${dataType.catalogString}")
+  }
+
+  def cannotGenerateCodeForUnsupportedTypeError(dataType: DataType): Throwable 
= {
+    new IllegalArgumentException(s"cannot generate code for unsupported type: 
$dataType")
+  }
+
+  def cannotInterpolateClassIntoCodeBlockError(arg: Any): Throwable = {
+    new IllegalArgumentException(
+      s"Can not interpolate ${arg.getClass.getName} into code block.")
+  }
+
+  def customCollectionClsCannotResolvedError(): Throwable = {

Review comment:
       customCollectionClsNotResolvedError

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/QueryExecutionErrors.scala
##########
@@ -51,18 +58,209 @@ object QueryExecutionErrors {
       s"[BUG] logical plan should not have output of char/varchar type: $leaf")
   }
 
-  def cannotEvaluateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
-    new UnsupportedOperationException(s"Cannot evaluate expression: 
$generator")
+  def cannotEvaluateExpressionError(expression: Expression): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate expression: 
$expression")
   }
 
-  def cannotGenerateCodeForGeneratorError(generator: UnresolvedGenerator): 
Throwable = {
-    new UnsupportedOperationException(s"Cannot generate code for expression: 
$generator")
+  def cannotGenerateCodeForExpressionError(expression: Expression): Throwable 
= {
+    new UnsupportedOperationException(s"Cannot generate code for expression: 
$expression")
   }
 
   def cannotTerminateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
     new UnsupportedOperationException(s"Cannot terminate expression: 
$generator")
   }
 
+  def castDownLongCauseOverflowError(t: Any, targetType: String): Throwable = {
+    new ArithmeticException(s"Casting $t to $targetType causes overflow")
+  }
+
+  def cannotRepresentDecimalError(value: Decimal, decimalType: DecimalType): 
Throwable = {
+    new ArithmeticException(s"${value.toDebugString} cannot be represented as 
" +
+      s"Decimal(${decimalType.precision}, ${decimalType.scale}).")
+  }
+
+  def invalidInputSyntaxError(s: UTF8String): Throwable = {
+    new NumberFormatException(s"invalid input syntax for type numeric: $s")
+  }
+
+  def cannotCastFromNullTypeDirectlyError(to: DataType): Throwable = {
+    new SparkException(s"should not directly cast from NullType to $to.")
+  }
+
+  def cannotCastError(from: DataType, to: DataType): Throwable = {
+    new SparkException(s"Cannot cast $from to $to.")
+  }
+
+  def cannotParseDecimalError(): Throwable = {
+    new IllegalArgumentException("Cannot parse any decimal")
+  }
+
+  def simpleStringWithNodeIdUnsupportedError(nodeName: String): Throwable = {
+    new UnsupportedOperationException(s"$nodeName does not implement 
simpleStringWithNodeId")
+  }
+
+  def evaluateUnevaluableAggregateUnsupportedError(
+      methodName: String, unEvaluable: UnevaluableAggregate): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate $methodName: 
$unEvaluable")
+  }
+
+  def dataTypeUnsupportedError(dt: DataType): Throwable = {
+    new SparkException(s"Unsupported data type $dt")
+  }
+
+  def udfErrorMessage(funcCls: String, inputTypes: String, outputType: 
String): String = {
+    s"Failed to execute user defined function ($funcCls: ($inputTypes) => 
$outputType)"
+  }
+
+  def failedExecuteUserDefinedFunctionError(
+      funcCls: String, inputTypes: String, outputType: String, e: Exception): 
Throwable = {
+    new SparkException(udfErrorMessage(funcCls, inputTypes, outputType), e)
+  }
+
+  def divideByZeroError(): Throwable = {
+    new ArithmeticException("divide by zero")
+  }
+
+  def dataTypeUnsupportedByExtractValueError(
+      dataType: DataType, extraction: Expression, child: Expression): 
Throwable = {
+    val errorMsg = dataType match {
+      case StructType(_) =>
+        s"Field name should be String Literal, but it's $extraction"
+      case other =>
+        s"Can't extract value from $child: need struct type but got 
${other.catalogString}"
+    }
+    new AnalysisException(errorMsg)
+  }
+
+  def invalidArrayIndexError(index: Int, numElements: Int): Throwable = {
+    new ArrayIndexOutOfBoundsException(s"Invalid index: $index, numElements: 
$numElements")
+  }
+
+  def mapKeyNotExistsError(key: Any): Throwable = {
+    new NoSuchElementException(s"Key $key does not exist.")
+  }
+
+  def rowFromCSVParserNotExpectedError(): Throwable = {
+    new IllegalArgumentException("Expected one row from CSV parser.")
+  }
+
+  def inputTypeUnsupportedError(dataType: DataType): Throwable = {
+    new IllegalArgumentException(s"Unsupported input type 
${dataType.catalogString}")
+  }
+
+  def invalidFractionOfSecondError(): Throwable = {
+    new DateTimeException("The fraction of sec must be zero. Valid range is 
[0, 60].")
+  }
+
+  def overflowInSumOfDecimalError(): Throwable = {
+    new ArithmeticException("Overflow in sum of decimals.")
+  }
+
+  def mapSizeExceedArraySizeWhenZipMapError(size: Int): Throwable = {
+    new RuntimeException(s"Unsuccessful try to zip maps with $size " +
+      "unique keys due to exceeding the array size limit " +
+      s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+  }
+
+  def copyNullFieldNotAllowedError(): Throwable = {
+    new IllegalStateException("Do not attempt to copy a null field")
+  }
+
+  def literalTypeUnsupportedError(v: Any): Throwable = {
+    new RuntimeException(s"Unsupported literal type ${v.getClass} $v")
+  }
+
+  def noDefaultForDataTypeError(dataType: DataType): Throwable = {
+    new RuntimeException(s"no default for type $dataType")
+  }
+
+  def doGenCodeOfAliasShouldNotBeCalledError(): Throwable = {
+    new IllegalStateException("Alias.doGenCode should not be called.")
+  }
+
+  def orderedOperationUnsupportedByDataTypeError(dataType: DataType): 
Throwable = {
+    new IllegalArgumentException(s"Type $dataType does not support ordered 
operations")
+  }
+
+  def indexOfMatchGroupLessThanZeroNotAllowedError(): Throwable = {
+    new IllegalArgumentException("The specified group index cannot be less 
than zero")
+  }
+
+  def groupIndexExceedMatchGroupCountNotAllowedError(
+      groupCount: Int, groupIndex: Int): Throwable = {
+    new IllegalArgumentException(
+      s"Regex group count is $groupCount, but the specified group index is 
$groupIndex")
+  }
+
+  def invalidUrlError(url: UTF8String, e: URISyntaxException): Throwable = {
+    new IllegalArgumentException(s"Find an invaild url string 
${url.toString}", e)
+  }
+
+  def dataTypeUnsupportedError(): Throwable = {
+    new UnsupportedOperationException("dataType")
+  }
+
+  def mergeUnsupportedByWindowFunctionError(): Throwable = {
+    new UnsupportedOperationException("Window Functions do not support 
merging.")
+  }
+
+  def dataTypeUnexpectedError(dataType: DataType): Throwable = {
+    new UnsupportedOperationException(s"Unexpected data type 
${dataType.catalogString}")
+  }
+
+  def negativeValueUnexpectedError(frequencyExpression : Expression): 
Throwable = {
+    new SparkException(s"Negative values found in ${frequencyExpression.sql}")
+  }
+
+  def addNewFunctionMismatchedWithFunctionError(funcName: String): Throwable = 
{
+    new IllegalArgumentException(s"$funcName is not matched at addNewFunction")
+  }
+
+  def cannotGenerateCodeForUnComparableTypeError(
+      codeType: String, dataType: DataType): Throwable = {
+    new IllegalArgumentException(
+      s"cannot generate $codeType code for un-comparable type: 
${dataType.catalogString}")
+  }
+
+  def cannotGenerateCodeForUnsupportedTypeError(dataType: DataType): Throwable 
= {
+    new IllegalArgumentException(s"cannot generate code for unsupported type: 
$dataType")
+  }
+
+  def cannotInterpolateClassIntoCodeBlockError(arg: Any): Throwable = {
+    new IllegalArgumentException(
+      s"Can not interpolate ${arg.getClass.getName} into code block.")
+  }
+
+  def customCollectionClsCannotResolvedError(): Throwable = {
+    new UnsupportedOperationException("not resolved")
+  }
+
+  def classUnsupportedByMapObjectError(cls: Class[_]): Throwable = {

Review comment:
       classUnsupportedByMapObjectsError

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/QueryExecutionErrors.scala
##########
@@ -51,18 +58,209 @@ object QueryExecutionErrors {
       s"[BUG] logical plan should not have output of char/varchar type: $leaf")
   }
 
-  def cannotEvaluateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
-    new UnsupportedOperationException(s"Cannot evaluate expression: 
$generator")
+  def cannotEvaluateExpressionError(expression: Expression): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate expression: 
$expression")
   }
 
-  def cannotGenerateCodeForGeneratorError(generator: UnresolvedGenerator): 
Throwable = {
-    new UnsupportedOperationException(s"Cannot generate code for expression: 
$generator")
+  def cannotGenerateCodeForExpressionError(expression: Expression): Throwable 
= {
+    new UnsupportedOperationException(s"Cannot generate code for expression: 
$expression")
   }
 
   def cannotTerminateGeneratorError(generator: UnresolvedGenerator): Throwable 
= {
     new UnsupportedOperationException(s"Cannot terminate expression: 
$generator")
   }
 
+  def castDownLongCauseOverflowError(t: Any, targetType: String): Throwable = {
+    new ArithmeticException(s"Casting $t to $targetType causes overflow")
+  }
+
+  def cannotRepresentDecimalError(value: Decimal, decimalType: DecimalType): 
Throwable = {
+    new ArithmeticException(s"${value.toDebugString} cannot be represented as 
" +
+      s"Decimal(${decimalType.precision}, ${decimalType.scale}).")
+  }
+
+  def invalidInputSyntaxError(s: UTF8String): Throwable = {
+    new NumberFormatException(s"invalid input syntax for type numeric: $s")
+  }
+
+  def cannotCastFromNullTypeDirectlyError(to: DataType): Throwable = {
+    new SparkException(s"should not directly cast from NullType to $to.")
+  }
+
+  def cannotCastError(from: DataType, to: DataType): Throwable = {
+    new SparkException(s"Cannot cast $from to $to.")
+  }
+
+  def cannotParseDecimalError(): Throwable = {
+    new IllegalArgumentException("Cannot parse any decimal")
+  }
+
+  def simpleStringWithNodeIdUnsupportedError(nodeName: String): Throwable = {
+    new UnsupportedOperationException(s"$nodeName does not implement 
simpleStringWithNodeId")
+  }
+
+  def evaluateUnevaluableAggregateUnsupportedError(
+      methodName: String, unEvaluable: UnevaluableAggregate): Throwable = {
+    new UnsupportedOperationException(s"Cannot evaluate $methodName: 
$unEvaluable")
+  }
+
+  def dataTypeUnsupportedError(dt: DataType): Throwable = {
+    new SparkException(s"Unsupported data type $dt")
+  }
+
+  def udfErrorMessage(funcCls: String, inputTypes: String, outputType: 
String): String = {
+    s"Failed to execute user defined function ($funcCls: ($inputTypes) => 
$outputType)"
+  }
+
+  def failedExecuteUserDefinedFunctionError(
+      funcCls: String, inputTypes: String, outputType: String, e: Exception): 
Throwable = {
+    new SparkException(udfErrorMessage(funcCls, inputTypes, outputType), e)
+  }
+
+  def divideByZeroError(): Throwable = {
+    new ArithmeticException("divide by zero")
+  }
+
+  def dataTypeUnsupportedByExtractValueError(
+      dataType: DataType, extraction: Expression, child: Expression): 
Throwable = {
+    val errorMsg = dataType match {
+      case StructType(_) =>
+        s"Field name should be String Literal, but it's $extraction"
+      case other =>
+        s"Can't extract value from $child: need struct type but got 
${other.catalogString}"
+    }
+    new AnalysisException(errorMsg)
+  }
+
+  def invalidArrayIndexError(index: Int, numElements: Int): Throwable = {
+    new ArrayIndexOutOfBoundsException(s"Invalid index: $index, numElements: 
$numElements")
+  }
+
+  def mapKeyNotExistsError(key: Any): Throwable = {
+    new NoSuchElementException(s"Key $key does not exist.")
+  }
+
+  def rowFromCSVParserNotExpectedError(): Throwable = {
+    new IllegalArgumentException("Expected one row from CSV parser.")
+  }
+
+  def inputTypeUnsupportedError(dataType: DataType): Throwable = {
+    new IllegalArgumentException(s"Unsupported input type 
${dataType.catalogString}")
+  }
+
+  def invalidFractionOfSecondError(): Throwable = {
+    new DateTimeException("The fraction of sec must be zero. Valid range is 
[0, 60].")
+  }
+
+  def overflowInSumOfDecimalError(): Throwable = {
+    new ArithmeticException("Overflow in sum of decimals.")
+  }
+
+  def mapSizeExceedArraySizeWhenZipMapError(size: Int): Throwable = {
+    new RuntimeException(s"Unsuccessful try to zip maps with $size " +
+      "unique keys due to exceeding the array size limit " +
+      s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+  }
+
+  def copyNullFieldNotAllowedError(): Throwable = {
+    new IllegalStateException("Do not attempt to copy a null field")
+  }
+
+  def literalTypeUnsupportedError(v: Any): Throwable = {
+    new RuntimeException(s"Unsupported literal type ${v.getClass} $v")
+  }
+
+  def noDefaultForDataTypeError(dataType: DataType): Throwable = {
+    new RuntimeException(s"no default for type $dataType")
+  }
+
+  def doGenCodeOfAliasShouldNotBeCalledError(): Throwable = {
+    new IllegalStateException("Alias.doGenCode should not be called.")
+  }
+
+  def orderedOperationUnsupportedByDataTypeError(dataType: DataType): 
Throwable = {
+    new IllegalArgumentException(s"Type $dataType does not support ordered 
operations")
+  }
+
+  def indexOfMatchGroupLessThanZeroNotAllowedError(): Throwable = {
+    new IllegalArgumentException("The specified group index cannot be less 
than zero")
+  }
+
+  def groupIndexExceedMatchGroupCountNotAllowedError(
+      groupCount: Int, groupIndex: Int): Throwable = {
+    new IllegalArgumentException(
+      s"Regex group count is $groupCount, but the specified group index is 
$groupIndex")
+  }
+
+  def invalidUrlError(url: UTF8String, e: URISyntaxException): Throwable = {
+    new IllegalArgumentException(s"Find an invaild url string 
${url.toString}", e)
+  }
+
+  def dataTypeUnsupportedError(): Throwable = {
+    new UnsupportedOperationException("dataType")
+  }
+
+  def mergeUnsupportedByWindowFunctionError(): Throwable = {
+    new UnsupportedOperationException("Window Functions do not support 
merging.")
+  }
+
+  def dataTypeUnexpectedError(dataType: DataType): Throwable = {
+    new UnsupportedOperationException(s"Unexpected data type 
${dataType.catalogString}")
+  }
+
+  def negativeValueUnexpectedError(frequencyExpression : Expression): 
Throwable = {
+    new SparkException(s"Negative values found in ${frequencyExpression.sql}")
+  }
+
+  def addNewFunctionMismatchedWithFunctionError(funcName: String): Throwable = 
{
+    new IllegalArgumentException(s"$funcName is not matched at addNewFunction")
+  }
+
+  def cannotGenerateCodeForUnComparableTypeError(
+      codeType: String, dataType: DataType): Throwable = {
+    new IllegalArgumentException(
+      s"cannot generate $codeType code for un-comparable type: 
${dataType.catalogString}")
+  }
+
+  def cannotGenerateCodeForUnsupportedTypeError(dataType: DataType): Throwable 
= {
+    new IllegalArgumentException(s"cannot generate code for unsupported type: 
$dataType")
+  }
+
+  def cannotInterpolateClassIntoCodeBlockError(arg: Any): Throwable = {
+    new IllegalArgumentException(
+      s"Can not interpolate ${arg.getClass.getName} into code block.")
+  }
+
+  def customCollectionClsCannotResolvedError(): Throwable = {
+    new UnsupportedOperationException("not resolved")
+  }
+
+  def classUnsupportedByMapObjectError(cls: Class[_]): Throwable = {
+    new RuntimeException(s"class `${cls.getName}` is not supported by 
`MapObjects` as " +
+      "resulting collection.")
+  }
+
+  def nullAsMapKeyNotAllowedError(): Throwable = {
+    new RuntimeException("Cannot use null as map key!")
+  }
+
+  def methodNotDeclaredError(name: String): Throwable = {
+    new NoSuchMethodException(s"""A method named "$name" is not declared """ +
+      "in any enclosing class nor any supertype")
+  }
+
+  def inputExternalRowCannotBeNullError(): Throwable = {
+    new RuntimeException("The input external row cannot be null.")
+  }
+
+  def fieldOfInputRowCannotBeNullMsg(index: Int, fieldName: String): String = {

Review comment:
       We can remove this and directly use the error message inside the 
function below.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to