[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r471912000 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala ## @@ -236,6 +236,45 @@ case class ShowFunctionsCommand( } } + +/** + * A command for users to refresh the persistent function. + * The syntax of using this command in SQL is: + * {{{ + *REFRESH FUNCTION functionName + * }}} + */ +case class RefreshFunctionCommand( +databaseName: Option[String], +functionName: String) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val catalog = sparkSession.sessionState.catalog +if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { Review comment: how about ``` CREATE FUNCTION rand AS 'xxx'; DESC FUNCTION default.rand; ``` I think this is similar to table and temp views. Spark will try to look up temp view first, so if the name conflicts, temp view is preferred. But users can still use a qualified table name to read the table explicitly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r458024988 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala ## @@ -236,6 +236,46 @@ case class ShowFunctionsCommand( } } + +/** + * A command for users to refresh the persistent function. + * The syntax of using this command in SQL is: + * {{{ + *REFRESH FUNCTION functionName + * }}} + */ +case class RefreshFunctionCommand( +databaseName: Option[String], +functionName: String) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val catalog = sparkSession.sessionState.catalog +if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { + throw new AnalysisException(s"Cannot refresh builtin function $functionName") +} +if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) { + throw new AnalysisException(s"Cannot refresh temporary function $functionName") +} + +val identifier = FunctionIdentifier( + functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase))) +// we only refresh the permanent function. +if (catalog.isPersistentFunction(identifier)) { + // register overwrite function. + val func = catalog.getFunctionMetadata(identifier) + catalog.registerFunction(func, true) +} else { + // clear cached function, if not exists throw exception + if (!catalog.unregisterFunction(identifier)) { +throw new NoSuchFunctionException(identifier.database.get, identifier.funcName) Review comment: Sorry, I may not make myself clear. I mean to go back to your original proposal, which always throw an exception if the function doesn't exist in the metastore. That said, we should do ``` catalog.unregisterFunction(identifier) throw new NoSuchFunctionException(identifier.database.get, identifier.funcName) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r457864581 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala ## @@ -236,6 +236,44 @@ case class ShowFunctionsCommand( } } + +/** + * A command for users to refresh the persistent function. + * The syntax of using this command in SQL is: + * {{{ + *REFRESH FUNCTION functionName + * }}} + */ +case class RefreshFunctionCommand( +databaseName: Option[String], +functionName: String) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val catalog = sparkSession.sessionState.catalog +if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { + throw new AnalysisException(s"Cannot refresh builtin function $functionName") +} +if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) { + throw new AnalysisException(s"Cannot refresh temporary function $functionName") +} + +val identifier = FunctionIdentifier( + functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase))) +// we only refresh the permanent function. +if (catalog.isPersistentFunction(identifier)) { + // register overwrite function. + val func = catalog.getFunctionMetadata(identifier) + catalog.registerFunction(func, true) +} else { + // clear cached function. + catalog.unregisterFunction(identifier) Review comment: can you change it? I'd expect something like ``` catalog.unregisterFunction(identifier) throw new NoSuchFunction... ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r455844229 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala ## @@ -236,6 +236,44 @@ case class ShowFunctionsCommand( } } + +/** + * A command for users to refresh the persistent function. + * The syntax of using this command in SQL is: + * {{{ + *REFRESH FUNCTION functionName + * }}} + */ +case class RefreshFunctionCommand( +databaseName: Option[String], +functionName: String) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val catalog = sparkSession.sessionState.catalog +if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { + throw new AnalysisException(s"Cannot refresh builtin function $functionName") +} +if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) { + throw new AnalysisException(s"Cannot refresh temporary function $functionName") +} + +val identifier = FunctionIdentifier( + functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase))) +// we only refresh the permanent function. +if (catalog.isPersistentFunction(identifier)) { + // register overwrite function. + val func = catalog.getFunctionMetadata(identifier) + catalog.registerFunction(func, true) +} else { + // clear cached function. + catalog.unregisterFunction(identifier) Review comment: I spent more time thinking about it, and feel like your original proposal is better. When people refreshing a function, they should expect the function to exist and want to use it later. It looks more consistent if we always fail REFRESH TABLE when the function doesn't exist in the catalog anymore. That said, we can make `unregisterFunction` a noop if the function is not registered, and throw `NoSuchFunctionException` at the end. Sorry for the back and forth! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r455680272 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala ## @@ -236,6 +236,46 @@ case class ShowFunctionsCommand( } } + +/** + * A command for users to refresh the persistent function. + * The syntax of using this command in SQL is: + * {{{ + *REFRESH FUNCTION functionName + * }}} + */ +case class RefreshFunctionCommand( +databaseName: Option[String], +functionName: String) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val catalog = sparkSession.sessionState.catalog +if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { + throw new AnalysisException(s"Cannot refresh builtin function $functionName") +} +if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) { + throw new AnalysisException(s"Cannot refresh temporary function $functionName") +} + +val identifier = FunctionIdentifier( + functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase))) +// we only refresh the permanent function. +if (catalog.isPersistentFunction(identifier)) { + // register overwrite function. + val func = catalog.getFunctionMetadata(identifier) + catalog.registerFunction(func, true) +} else if (catalog.isRegisteredFunction(identifier)) { Review comment: nit: we can simplify it ``` ... else { catalog.unregisterFunction(identifier) } ``` `unregisterFunction` will fail if function is not registered. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r455266730 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala ## @@ -3030,6 +3030,47 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } } + + test("REFRESH FUNCTION") { +val msg = intercept[AnalysisException] { + sql("REFRESH FUNCTION md5") +}.getMessage +assert(msg.contains("Cannot refresh builtin function")) + +withUserDefinedFunction("func1" -> true) { + sql("CREATE TEMPORARY FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") + val msg = intercept[AnalysisException] { +sql("REFRESH FUNCTION func1") + }.getMessage + assert(msg.contains("Cannot refresh temporary function")) +} + +withUserDefinedFunction("func1" -> false) { + intercept[NoSuchFunctionException] { +sql("REFRESH FUNCTION func1") + } + + val func = FunctionIdentifier("func1", Some("default")) + sql("CREATE FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") + assert(!spark.sessionState.catalog.isRegisteredFunction(func)) + sql("REFRESH FUNCTION func1") + assert(spark.sessionState.catalog.isRegisteredFunction(func)) + + spark.sessionState.catalog.externalCatalog.dropFunction("default", "func1") + assert(spark.sessionState.catalog.isRegisteredFunction(func)) + sql("REFRESH FUNCTION func1") + assert(!spark.sessionState.catalog.isRegisteredFunction(func)) + + val function = CatalogFunction(func, "test.non.exists.udf", Seq.empty) + spark.sessionState.catalog.createFunction(function, false) Review comment: Maybe we should make `CREATE FUNCTION` not lazy, in a new PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r454392912 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala ## @@ -3030,6 +3030,47 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } } + + test("REFRESH FUNCTION") { +val msg = intercept[AnalysisException] { + sql("REFRESH FUNCTION md5") +}.getMessage +assert(msg.contains("Cannot refresh builtin function")) + +withUserDefinedFunction("func1" -> true) { + sql("CREATE TEMPORARY FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") + val msg = intercept[AnalysisException] { +sql("REFRESH FUNCTION func1") + }.getMessage + assert(msg.contains("Cannot refresh temporary function")) +} + +withUserDefinedFunction("func1" -> false) { + intercept[NoSuchFunctionException] { +sql("REFRESH FUNCTION func1") + } + + val func = FunctionIdentifier("func1", Some("default")) + sql("CREATE FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") + assert(!spark.sessionState.catalog.isRegisteredFunction(func)) + sql("REFRESH FUNCTION func1") + assert(spark.sessionState.catalog.isRegisteredFunction(func)) + + spark.sessionState.catalog.externalCatalog.dropFunction("default", "func1") + assert(spark.sessionState.catalog.isRegisteredFunction(func)) + sql("REFRESH FUNCTION func1") + assert(!spark.sessionState.catalog.isRegisteredFunction(func)) + + val function = CatalogFunction(func, "test.non.exists.udf", Seq.empty) + spark.sessionState.catalog.createFunction(function, false) Review comment: or is it because you are calling the internal API not the `CREATE FUNCTION` command? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r454392485 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala ## @@ -3030,6 +3030,47 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } } + + test("REFRESH FUNCTION") { +val msg = intercept[AnalysisException] { + sql("REFRESH FUNCTION md5") +}.getMessage +assert(msg.contains("Cannot refresh builtin function")) + +withUserDefinedFunction("func1" -> true) { + sql("CREATE TEMPORARY FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") + val msg = intercept[AnalysisException] { +sql("REFRESH FUNCTION func1") + }.getMessage + assert(msg.contains("Cannot refresh temporary function")) +} + +withUserDefinedFunction("func1" -> false) { + intercept[NoSuchFunctionException] { +sql("REFRESH FUNCTION func1") + } + + val func = FunctionIdentifier("func1", Some("default")) + sql("CREATE FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") + assert(!spark.sessionState.catalog.isRegisteredFunction(func)) + sql("REFRESH FUNCTION func1") + assert(spark.sessionState.catalog.isRegisteredFunction(func)) + + spark.sessionState.catalog.externalCatalog.dropFunction("default", "func1") + assert(spark.sessionState.catalog.isRegisteredFunction(func)) + sql("REFRESH FUNCTION func1") + assert(!spark.sessionState.catalog.isRegisteredFunction(func)) + + val function = CatalogFunction(func, "test.non.exists.udf", Seq.empty) + spark.sessionState.catalog.createFunction(function, false) Review comment: We can fix CREATE FUNCTION later and update this test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r454336540 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala ## @@ -236,6 +236,46 @@ case class ShowFunctionsCommand( } } + +/** + * A command for users to refresh the persistent function. + * The syntax of using this command in SQL is: + * {{{ + *REFRESH FUNCTION functionName + * }}} + */ +case class RefreshFunctionCommand( +databaseName: Option[String], +functionName: String) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val catalog = sparkSession.sessionState.catalog +if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { + throw new AnalysisException(s"Cannot refresh builtin function $functionName") +} +if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) { + throw new AnalysisException(s"Cannot refresh temporary function $functionName") +} + +val identifier = FunctionIdentifier( + functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase))) +// we only refresh the permanent function. +if (catalog.isPersistentFunction(identifier)) { + // register overwrite function. + val func = catalog.getFunctionMetadata(identifier) + catalog.registerFunction(func, true) +} else if (catalog.isRegisteredFunction(identifier)) { + // clear cached function. + catalog.unregisterFunction(identifier, true) Review comment: does `unregisterFunction` need to take a boolean parameter? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r454336315 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala ## @@ -3030,6 +3030,47 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } } + + test("REFRESH FUNCTION") { +val msg = intercept[AnalysisException] { + sql("REFRESH FUNCTION md5") +}.getMessage +assert(msg.contains("Cannot refresh builtin function")) + +withUserDefinedFunction("func1" -> true) { + sql("CREATE TEMPORARY FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") + val msg = intercept[AnalysisException] { +sql("REFRESH FUNCTION func1") + }.getMessage + assert(msg.contains("Cannot refresh temporary function")) +} + +withUserDefinedFunction("func1" -> false) { + intercept[NoSuchFunctionException] { +sql("REFRESH FUNCTION func1") + } + + val func = FunctionIdentifier("func1", Some("default")) + sql("CREATE FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") + assert(!spark.sessionState.catalog.isRegisteredFunction(func)) + sql("REFRESH FUNCTION func1") + assert(spark.sessionState.catalog.isRegisteredFunction(func)) + + spark.sessionState.catalog.externalCatalog.dropFunction("default", "func1") + assert(spark.sessionState.catalog.isRegisteredFunction(func)) + sql("REFRESH FUNCTION func1") + assert(!spark.sessionState.catalog.isRegisteredFunction(func)) + + val function = CatalogFunction(func, "test.non.exists.udf", Seq.empty) + spark.sessionState.catalog.createFunction(function, false) Review comment: It's a bit weird if we don't fail invalid functions when creating, but fail when refreshing it. How hard is it to make `REFRESH TABLE` lazy? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r454307435 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala ## @@ -236,6 +236,45 @@ case class ShowFunctionsCommand( } } + +/** + * A command for users to refresh the persistent function. + * The syntax of using this command in SQL is: + * {{{ + *REFRESH FUNCTION functionName + * }}} + */ +case class RefreshFunctionCommand( +databaseName: Option[String], +functionName: String) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val catalog = sparkSession.sessionState.catalog +if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { + throw new AnalysisException(s"Cannot refresh builtin function $functionName") +} +if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) { + throw new AnalysisException(s"Cannot refresh temporary function $functionName") +} + +val identifier = FunctionIdentifier( + functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase))) +// we only refresh the permanent function. +if (catalog.isPersistentFunction(identifier)) { + // register overwrite function. + val func = catalog.getFunctionMetadata(identifier) + catalog.registerFunction(func, true) +} else { + // function is not exists, clear cached function. + catalog.unregisterFunction(identifier, true) + throw new NoSuchFunctionException(identifier.database.get, functionName) Review comment: LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r454182771 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala ## @@ -236,6 +236,45 @@ case class ShowFunctionsCommand( } } + +/** + * A command for users to refresh the persistent function. + * The syntax of using this command in SQL is: + * {{{ + *REFRESH FUNCTION functionName + * }}} + */ +case class RefreshFunctionCommand( +databaseName: Option[String], +functionName: String) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val catalog = sparkSession.sessionState.catalog +if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { + throw new AnalysisException(s"Cannot refresh builtin function $functionName") +} +if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) { + throw new AnalysisException(s"Cannot refresh temporary function $functionName") +} + +val identifier = FunctionIdentifier( + functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase))) +// we only refresh the permanent function. +if (catalog.isPersistentFunction(identifier)) { + // register overwrite function. + val func = catalog.getFunctionMetadata(identifier) + catalog.registerFunction(func, true) +} else { + // function is not exists, clear cached function. + catalog.unregisterFunction(identifier, true) + throw new NoSuchFunctionException(identifier.database.get, functionName) Review comment: It depends on how you define "function exists". If users can still use this function in SQL queries, why do we throw NoSuchFunctionException in REFRESH FUNCTION? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r454123447 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala ## @@ -236,6 +236,45 @@ case class ShowFunctionsCommand( } } + +/** + * A command for users to refresh the persistent function. + * The syntax of using this command in SQL is: + * {{{ + *REFRESH FUNCTION functionName + * }}} + */ +case class RefreshFunctionCommand( +databaseName: Option[String], +functionName: String) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val catalog = sparkSession.sessionState.catalog +if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { + throw new AnalysisException(s"Cannot refresh builtin function $functionName") +} +if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) { + throw new AnalysisException(s"Cannot refresh temporary function $functionName") +} + +val identifier = FunctionIdentifier( + functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase))) +// we only refresh the permanent function. +if (catalog.isPersistentFunction(identifier)) { + // register overwrite function. + val func = catalog.getFunctionMetadata(identifier) + catalog.registerFunction(func, true) +} else { + // function is not exists, clear cached function. + catalog.unregisterFunction(identifier, true) + throw new NoSuchFunctionException(identifier.database.get, functionName) Review comment: are you sure `REFRESH TABLE` can do some work which has side-effects and still fail? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r454087942 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala ## @@ -236,6 +236,45 @@ case class ShowFunctionsCommand( } } + +/** + * A command for users to refresh the persistent function. + * The syntax of using this command in SQL is: + * {{{ + *REFRESH FUNCTION functionName + * }}} + */ +case class RefreshFunctionCommand( +databaseName: Option[String], +functionName: String) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val catalog = sparkSession.sessionState.catalog +if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { + throw new AnalysisException(s"Cannot refresh builtin function $functionName") +} +if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) { + throw new AnalysisException(s"Cannot refresh temporary function $functionName") +} + +val identifier = FunctionIdentifier( + functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase))) +// we only refresh the permanent function. +if (catalog.isPersistentFunction(identifier)) { + // register overwrite function. + val func = catalog.getFunctionMetadata(identifier) + catalog.registerFunction(func, true) +} else { + // function is not exists, clear cached function. Review comment: BTW does the query fail if it tries to use such a function? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r454087814 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala ## @@ -236,6 +236,45 @@ case class ShowFunctionsCommand( } } + +/** + * A command for users to refresh the persistent function. + * The syntax of using this command in SQL is: + * {{{ + *REFRESH FUNCTION functionName + * }}} + */ +case class RefreshFunctionCommand( +databaseName: Option[String], +functionName: String) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val catalog = sparkSession.sessionState.catalog +if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { + throw new AnalysisException(s"Cannot refresh builtin function $functionName") +} +if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) { + throw new AnalysisException(s"Cannot refresh temporary function $functionName") +} + +val identifier = FunctionIdentifier( + functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase))) +// we only refresh the permanent function. +if (catalog.isPersistentFunction(identifier)) { + // register overwrite function. + val func = catalog.getFunctionMetadata(identifier) + catalog.registerFunction(func, true) +} else { + // function is not exists, clear cached function. + catalog.unregisterFunction(identifier, true) + throw new NoSuchFunctionException(identifier.database.get, functionName) Review comment: If it's a valid use case, why do we throw an exception here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r454087683 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala ## @@ -236,6 +236,45 @@ case class ShowFunctionsCommand( } } + +/** + * A command for users to refresh the persistent function. + * The syntax of using this command in SQL is: + * {{{ + *REFRESH FUNCTION functionName + * }}} + */ +case class RefreshFunctionCommand( +databaseName: Option[String], +functionName: String) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val catalog = sparkSession.sessionState.catalog +if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { + throw new AnalysisException(s"Cannot refresh builtin function $functionName") +} +if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) { + throw new AnalysisException(s"Cannot refresh temporary function $functionName") +} + +val identifier = FunctionIdentifier( + functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase))) +// we only refresh the permanent function. +if (catalog.isPersistentFunction(identifier)) { + // register overwrite function. + val func = catalog.getFunctionMetadata(identifier) + catalog.registerFunction(func, true) +} else { + // function is not exists, clear cached function. Review comment: do you mean function does not exist in the metastore/catalog, and we need to clear the cache entry? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r453643288 ## File path: docs/sql-ref-syntax-aux-cache-refresh-function.md ## @@ -0,0 +1,60 @@ +--- +layout: global +title: REFRESH FUNCTION +displayTitle: REFRESH FUNCTION +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +### Description + +`REFRESH FUNCTION` statement invalidates the cached function entry, which includes a class name +and resource location of the given function. The invalidated cache is populated right away. +Note that `REFRESH FUNCTION` only works for permanent functions. Refreshing native functions or temporary functions will cause an exception. + +### Syntax + +```sql +REFRESH FUNCTION function_identifier +``` + +### Parameters + +* **function_identifier** + +Specifies a function name, which is either a qualified or unqualified name. If no database identifier is provided, uses the current database. + +**Syntax:** `[ database_name. ] function_name` + +### Examples + +```sql +-- The cached entry of the function will be refreshed +-- The function is resolved from the current database as the function name is unqualified. +REFRESH FUNCTION func1; + +-- The cached entry of the function will be refreshed +-- The function is resolved from tempDB database as the function name is qualified. +REFRESH FUNCTION tempDB.func1; Review comment: nit: `db1.func1`? `tempDB` sounds like Spark supports temporary database, while it doesn't. ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala ## @@ -155,4 +155,31 @@ private[sql] trait LookupCatalog extends Logging { None } } + + // TODO: move function related v2 statements to the new framework. Review comment: @imback82 do you have time to work on this TODO? ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala ## @@ -236,6 +236,45 @@ case class ShowFunctionsCommand( } } + +/** + * A command for users to refresh the persistent function. + * The syntax of using this command in SQL is: + * {{{ + *REFRESH FUNCTION functionName + * }}} + */ +case class RefreshFunctionCommand( +databaseName: Option[String], +functionName: String) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val catalog = sparkSession.sessionState.catalog +if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { + throw new AnalysisException(s"Cannot refresh builtin function $functionName") +} +if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) { + throw new AnalysisException(s"Cannot refresh temporary function $functionName") +} + +val identifier = FunctionIdentifier( + functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase))) +// we only refresh the permanent function. +// 1. clear cached function. +// 2. register function if exists. +catalog.unregisterFunction(identifier, true) +if (catalog.isPersistentFunction(identifier)) { + val func = catalog.getFunctionMetadata(identifier) + catalog.registerFunction(func, true) Review comment: Does `registerFunction` overwrite existing entry? If it does then we don't need to add `unregisterFunction` API. ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala ## @@ -236,6 +236,45 @@ case class ShowFunctionsCommand( } } + +/** + * A command for users to refresh the persistent function. + * The syntax of using this command in SQL is: + * {{{ + *REFRESH FUNCTION functionName + * }}} + */ +case class RefreshFunctionCommand( +databaseName: Option[String], +functionName: String) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { +val catalog = sparkSession.sessionState.catalog +if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { + throw new AnalysisException(s"Cannot refresh builtin function $functionName") +} +if (catalog.isTempo
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r444755149 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1885,11 +1885,17 @@ class Analyzer( } /** + * Replaces [[UnresolvedFunc]]s with concrete [[LogicalPlan]]s. * Replaces [[UnresolvedFunction]]s with concrete [[Expression]]s. */ object ResolveFunctions extends Rule[LogicalPlan] { val trimWarningEnabled = new AtomicBoolean(true) def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case UnresolvedFunc(multipartIdent) => +val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent, s"${plan.nodeName}") Review comment: `plan.nodeName` -> "function lookup"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r444211206 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1885,11 +1885,17 @@ class Analyzer( } /** + * Replaces [[UnresolvedFunc]]s with concrete [[LogicalPlan]]s. * Replaces [[UnresolvedFunction]]s with concrete [[Expression]]s. */ object ResolveFunctions extends Rule[LogicalPlan] { val trimWarningEnabled = new AtomicBoolean(true) def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case UnresolvedFunc(multipartIdent) => +val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent, s"${plan.nodeName}") +val info = v1SessionCatalog.lookupFunctionInfo(funcIdent) Review comment: does this support builtin and temp functions? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r443339093 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala ## @@ -74,3 +84,8 @@ case class ResolvedTable(catalog: TableCatalog, identifier: Identifier, table: T case class ResolvedView(identifier: Identifier) extends LeafNode { override def output: Seq[Attribute] = Nil } + +case class ResolvedFunc(catalog: CatalogPlugin, functionIdentifier: FunctionIdentifier) Review comment: shall we put `CatalogFunction` instead of `FunctionIdentifier` in the parameter? otherwise the lookup is still done at runtime. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r443337504 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1890,6 +1890,10 @@ class Analyzer( object ResolveFunctions extends Rule[LogicalPlan] { val trimWarningEnabled = new AtomicBoolean(true) def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case RefreshFunction(UnresolvedFunc(multipartIdent)) => Review comment: and can we update the comment of this rule? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add REFRESH FUNCTION command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r443337381 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -1890,6 +1890,10 @@ class Analyzer( object ResolveFunctions extends Rule[LogicalPlan] { val trimWarningEnabled = new AtomicBoolean(true) def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case RefreshFunction(UnresolvedFunc(multipartIdent)) => Review comment: we should match `UnresolvedFunc` only, in case we add more function-related commands later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add refresh function command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r441481923 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala ## @@ -516,3 +516,8 @@ case class CommentOnNamespace(child: LogicalPlan, comment: String) extends Comma case class CommentOnTable(child: LogicalPlan, comment: String) extends Command { override def children: Seq[LogicalPlan] = child :: Nil } + +/** + * The logical plan of the REFRESH FUNCTION command that works for v2 catalogs. + */ +case class RefreshFunction(func: Seq[String]) extends Command Review comment: Can we create a `UnresolvedFunc`, similar to `UnresolvedTable`? The key point is to do the resolution in the analyzer, not at runtime in `RefreshFunctionCommand.run`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add refresh function command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r441402847 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala ## @@ -480,3 +480,9 @@ case class CreateFunctionStatement( isTemp: Boolean, ignoreIfExists: Boolean, replace: Boolean) extends ParsedStatement + +/** + * REFRESH FUNCTION statement, as parsed from SQL + */ +case class RefreshFunctionStatement( Review comment: Since it's a new command, can we follow `CommentOnTable` and use the new command framework? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #28840: [SPARK-31999][SQL] Add refresh function command
cloud-fan commented on a change in pull request #28840: URL: https://github.com/apache/spark/pull/28840#discussion_r441401100 ## File path: docs/sql-ref-syntax-aux-refresh-function.md ## @@ -0,0 +1,59 @@ +--- +layout: global +title: REFRESH FUNCTION +displayTitle: REFRESH FUNCTION +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +### Description + +`REFRESH FUNCTION` statement invalidates the cached entries, which include class name Review comment: `the cached entries` -> `the cached function entry` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org