dongjoon-hyun commented on code in PR #49389:
URL: https://github.com/apache/spark/pull/49389#discussion_r1904963191
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SQLFunction.scala:
##########
@@ -88,19 +92,141 @@ case class SQLFunction(
(parsedExpression, parsedQuery)
}
}
+
+ /** Get scalar function return data type. */
+ def getScalarFuncReturnType: DataType = returnType match {
+ case Left(dataType) => dataType
+ case Right(_) =>
+ throw SparkException.internalError(
+ "This function is a table function, not a scalar function.")
+ }
+
+ /** Get table function return columns. */
+ def getTableFuncReturnCols: StructType = returnType match {
+ case Left(_) =>
+ throw SparkException.internalError(
+ "This function is a scalar function, not a table function.")
+ case Right(columns) => columns
+ }
+
+ /**
+ * Convert the SQL function to a [[CatalogFunction]].
+ */
+ def toCatalogFunction: CatalogFunction = {
+ val props = sqlFunctionToProps ++ properties
+ CatalogFunction(
+ identifier = name,
+ className = SQL_FUNCTION_PREFIX,
+ resources = propertiesToFunctionResources(props, name))
+ }
+
+ /**
+ * Convert the SQL function to an [[ExpressionInfo]].
+ */
+ def toExpressionInfo: ExpressionInfo = {
+ val props = sqlFunctionToProps ++ functionMetadataToProps ++ properties
+ val usage = mapper.writeValueAsString(props)
+ new ExpressionInfo(
+ SQL_FUNCTION_PREFIX,
+ name.database.orNull,
+ name.funcName,
+ usage,
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "sql_udf")
+ }
+
+ /**
+ * Convert the SQL function fields into properties.
+ */
+ private def sqlFunctionToProps: Map[String, String] = {
+ val props = new mutable.HashMap[String, String]
+ val inputParamText = inputParam.map(_.fields.map(_.toDDL).mkString(", "))
+ inputParamText.foreach(props.put(INPUT_PARAM, _))
+ val returnTypeText = returnType match {
+ case Left(dataType) => dataType.sql
+ case Right(columns) => columns.toDDL
+ }
+ props.put(RETURN_TYPE, returnTypeText)
+ exprText.foreach(props.put(EXPRESSION, _))
+ queryText.foreach(props.put(QUERY, _))
+ comment.foreach(props.put(COMMENT, _))
+ deterministic.foreach(d => props.put(DETERMINISTIC, d.toString))
+ containsSQL.foreach(x => props.put(CONTAINS_SQL, x.toString))
+ props.put(IS_TABLE_FUNC, isTableFunc.toString)
+ props.toMap
+ }
+
+ private def functionMetadataToProps: Map[String, String] = {
+ val props = new mutable.HashMap[String, String]
+ owner.foreach(props.put(OWNER, _))
+ props.put(CREATE_TIME, createTimeMs.toString)
+ props.toMap
+ }
}
object SQLFunction {
private val SQL_FUNCTION_PREFIX = "sqlFunction."
+ private val INPUT_PARAM: String = SQL_FUNCTION_PREFIX + "inputParam"
+ private val RETURN_TYPE: String = SQL_FUNCTION_PREFIX + "returnType"
+ private val EXPRESSION: String = SQL_FUNCTION_PREFIX + "expression"
+ private val QUERY: String = SQL_FUNCTION_PREFIX + "query"
+ private val COMMENT: String = SQL_FUNCTION_PREFIX + "comment"
+ private val DETERMINISTIC: String = SQL_FUNCTION_PREFIX + "deterministic"
+ private val CONTAINS_SQL: String = SQL_FUNCTION_PREFIX + "containsSQL"
+ private val IS_TABLE_FUNC: String = SQL_FUNCTION_PREFIX + "isTableFunc"
+ private val OWNER: String = SQL_FUNCTION_PREFIX + "owner"
+ private val CREATE_TIME: String = SQL_FUNCTION_PREFIX + "createTime"
+
private val FUNCTION_CATALOG_AND_NAMESPACE = "catalogAndNamespace.numParts"
private val FUNCTION_CATALOG_AND_NAMESPACE_PART_PREFIX =
"catalogAndNamespace.part."
private val FUNCTION_REFERRED_TEMP_VIEW_NAMES = "referredTempViewNames"
private val FUNCTION_REFERRED_TEMP_FUNCTION_NAMES =
"referredTempFunctionsNames"
private val FUNCTION_REFERRED_TEMP_VARIABLE_NAMES =
"referredTempVariableNames"
+ /**
+ * Convert a [[CatalogFunction]] into a SQL function.
+ */
+ def fromCatalogFunction(function: CatalogFunction, parser: ParserInterface):
SQLFunction = {
+ try {
+ val parts = function.resources.collect { case
FunctionResource(FileResource, uri) =>
+ val index = uri.substring(0, INDEX_LENGTH).toInt
+ val body = uri.substring(INDEX_LENGTH)
+ index -> body
+ }
+ val blob = parts.sortBy(_._1).map(_._2).mkString
+ val props = mapper.readValue(blob, classOf[Map[String, String]])
+ val isTableFunc = props(IS_TABLE_FUNC).toBoolean
+ val returnType = parseReturnTypeText(props(RETURN_TYPE), isTableFunc,
parser)
+ SQLFunction(
+ name = function.identifier,
+ inputParam = props.get(INPUT_PARAM).map(parseTableSchema(_, parser)),
+ returnType = returnType.get,
+ exprText = props.get(EXPRESSION),
+ queryText = props.get(QUERY),
+ comment = props.get(COMMENT),
+ deterministic = props.get(DETERMINISTIC).map(_.toBoolean),
+ containsSQL = props.get(CONTAINS_SQL).map(_.toBoolean),
+ isTableFunc = isTableFunc,
+ props.filterNot(_._1.startsWith(SQL_FUNCTION_PREFIX)))
+ } catch {
+ case e: Exception =>
Review Comment:
Got it.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SQLFunction.scala:
##########
@@ -88,19 +92,141 @@ case class SQLFunction(
(parsedExpression, parsedQuery)
}
}
+
+ /** Get scalar function return data type. */
+ def getScalarFuncReturnType: DataType = returnType match {
+ case Left(dataType) => dataType
+ case Right(_) =>
+ throw SparkException.internalError(
+ "This function is a table function, not a scalar function.")
+ }
+
+ /** Get table function return columns. */
+ def getTableFuncReturnCols: StructType = returnType match {
+ case Left(_) =>
+ throw SparkException.internalError(
+ "This function is a scalar function, not a table function.")
+ case Right(columns) => columns
+ }
+
+ /**
+ * Convert the SQL function to a [[CatalogFunction]].
+ */
+ def toCatalogFunction: CatalogFunction = {
+ val props = sqlFunctionToProps ++ properties
+ CatalogFunction(
+ identifier = name,
+ className = SQL_FUNCTION_PREFIX,
+ resources = propertiesToFunctionResources(props, name))
+ }
+
+ /**
+ * Convert the SQL function to an [[ExpressionInfo]].
+ */
+ def toExpressionInfo: ExpressionInfo = {
+ val props = sqlFunctionToProps ++ functionMetadataToProps ++ properties
+ val usage = mapper.writeValueAsString(props)
+ new ExpressionInfo(
+ SQL_FUNCTION_PREFIX,
+ name.database.orNull,
+ name.funcName,
+ usage,
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "sql_udf")
+ }
+
+ /**
+ * Convert the SQL function fields into properties.
+ */
+ private def sqlFunctionToProps: Map[String, String] = {
+ val props = new mutable.HashMap[String, String]
+ val inputParamText = inputParam.map(_.fields.map(_.toDDL).mkString(", "))
+ inputParamText.foreach(props.put(INPUT_PARAM, _))
+ val returnTypeText = returnType match {
+ case Left(dataType) => dataType.sql
+ case Right(columns) => columns.toDDL
+ }
+ props.put(RETURN_TYPE, returnTypeText)
+ exprText.foreach(props.put(EXPRESSION, _))
+ queryText.foreach(props.put(QUERY, _))
+ comment.foreach(props.put(COMMENT, _))
+ deterministic.foreach(d => props.put(DETERMINISTIC, d.toString))
+ containsSQL.foreach(x => props.put(CONTAINS_SQL, x.toString))
+ props.put(IS_TABLE_FUNC, isTableFunc.toString)
+ props.toMap
+ }
+
+ private def functionMetadataToProps: Map[String, String] = {
+ val props = new mutable.HashMap[String, String]
+ owner.foreach(props.put(OWNER, _))
+ props.put(CREATE_TIME, createTimeMs.toString)
+ props.toMap
+ }
}
object SQLFunction {
private val SQL_FUNCTION_PREFIX = "sqlFunction."
+ private val INPUT_PARAM: String = SQL_FUNCTION_PREFIX + "inputParam"
+ private val RETURN_TYPE: String = SQL_FUNCTION_PREFIX + "returnType"
+ private val EXPRESSION: String = SQL_FUNCTION_PREFIX + "expression"
+ private val QUERY: String = SQL_FUNCTION_PREFIX + "query"
+ private val COMMENT: String = SQL_FUNCTION_PREFIX + "comment"
+ private val DETERMINISTIC: String = SQL_FUNCTION_PREFIX + "deterministic"
+ private val CONTAINS_SQL: String = SQL_FUNCTION_PREFIX + "containsSQL"
+ private val IS_TABLE_FUNC: String = SQL_FUNCTION_PREFIX + "isTableFunc"
+ private val OWNER: String = SQL_FUNCTION_PREFIX + "owner"
+ private val CREATE_TIME: String = SQL_FUNCTION_PREFIX + "createTime"
+
private val FUNCTION_CATALOG_AND_NAMESPACE = "catalogAndNamespace.numParts"
private val FUNCTION_CATALOG_AND_NAMESPACE_PART_PREFIX =
"catalogAndNamespace.part."
private val FUNCTION_REFERRED_TEMP_VIEW_NAMES = "referredTempViewNames"
private val FUNCTION_REFERRED_TEMP_FUNCTION_NAMES =
"referredTempFunctionsNames"
private val FUNCTION_REFERRED_TEMP_VARIABLE_NAMES =
"referredTempVariableNames"
+ /**
+ * Convert a [[CatalogFunction]] into a SQL function.
+ */
+ def fromCatalogFunction(function: CatalogFunction, parser: ParserInterface):
SQLFunction = {
+ try {
+ val parts = function.resources.collect { case
FunctionResource(FileResource, uri) =>
+ val index = uri.substring(0, INDEX_LENGTH).toInt
+ val body = uri.substring(INDEX_LENGTH)
+ index -> body
+ }
+ val blob = parts.sortBy(_._1).map(_._2).mkString
+ val props = mapper.readValue(blob, classOf[Map[String, String]])
+ val isTableFunc = props(IS_TABLE_FUNC).toBoolean
+ val returnType = parseReturnTypeText(props(RETURN_TYPE), isTableFunc,
parser)
+ SQLFunction(
+ name = function.identifier,
+ inputParam = props.get(INPUT_PARAM).map(parseTableSchema(_, parser)),
+ returnType = returnType.get,
+ exprText = props.get(EXPRESSION),
+ queryText = props.get(QUERY),
+ comment = props.get(COMMENT),
+ deterministic = props.get(DETERMINISTIC).map(_.toBoolean),
+ containsSQL = props.get(CONTAINS_SQL).map(_.toBoolean),
+ isTableFunc = isTableFunc,
+ props.filterNot(_._1.startsWith(SQL_FUNCTION_PREFIX)))
+ } catch {
+ case e: Exception =>
Review Comment:
Got it. Thank you.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]