sryza commented on code in PR #53024: URL: https://github.com/apache/spark/pull/53024#discussion_r2521475992
########## python/pyspark/pipelines/add_pipeline_analysis_context.py: ########## @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from contextlib import contextmanager +from typing import Generator, Optional +from pyspark.sql import SparkSession + +from typing import Any, cast + + +@contextmanager +def add_pipeline_analysis_context( + spark: SparkSession, dataflow_graph_id: str, flow_name_opt: Optional[str] +) -> Generator[None, None, None]: + """ + Context manager that add PipelineAnalysisContext extension to the user context + used for pipeline specific analysis. + """ + _extension_id = None + _client = cast(Any, spark).client Review Comment: Worth including this comment explaining why we cast: ```python # Cast because mypy seems to think `spark`` is a function, not an object. Likely related to # SPARK-47544. ``` ########## sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala: ########## @@ -129,6 +131,46 @@ private[connect] object PipelinesHandler extends Logging { } } + /** + * Block unsupported SQL commands that are not explicitly allowlisted. Review Comment: Can we explain the spirit of what's allowed here? I.e. basically we want to limit all plans that have side effects, because registering pipeline definitions should be side effect free. While in general we can't necessarily stop all side effects from slipping through the cracks, it's a best effort to avoid users shooting themselves in the foot. ########## python/pyspark/pipelines/add_pipeline_analysis_context.py: ########## @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from contextlib import contextmanager +from typing import Generator, Optional +from pyspark.sql import SparkSession + +from typing import Any, cast + + +@contextmanager +def add_pipeline_analysis_context( + spark: SparkSession, dataflow_graph_id: str, flow_name_opt: Optional[str] +) -> Generator[None, None, None]: + """ + Context manager that add PipelineAnalysisContext extension to the user context + used for pipeline specific analysis. + """ + _extension_id = None + _client = cast(Any, spark).client + try: + import pyspark.sql.connect.proto as pb2 + from google.protobuf import any_pb2 + + _analysis_context = pb2.PipelineAnalysisContext(dataflow_graph_id=dataflow_graph_id) + if flow_name_opt is not None: Review Comment: Nitpick: I think should be able to just do this? ```python pb2.PipelineAnalysisContext( dataflow_graph_id=dataflow_graph_id, flow_name=flow_name_opt, ) ``` Though I've seen some weird type errors so maybe not. ########## python/pyspark/pipelines/block_connect_access.py: ########## @@ -41,7 +57,17 @@ def blocked_getattr(self: SparkConnectServiceStub, name: str) -> Callable: if name not in BLOCKED_RPC_NAMES: Review Comment: To make it easy to understand what's blocked, is there a straightforward way we could encapsulate all the logic in a helper function? ########## sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala: ########## @@ -129,6 +131,46 @@ private[connect] object PipelinesHandler extends Logging { } } + /** + * Block unsupported SQL commands that are not explicitly allowlisted. + */ + def blockUnsupportedSqlCommand(queryPlan: LogicalPlan): Unit = { + val supportedCommand = Set( + classOf[DescribeRelation], + classOf[ShowTables], + classOf[ShowTableProperties], + classOf[ShowNamespacesCommand], + classOf[ShowColumns], + classOf[ShowFunctions], + classOf[ShowViews], + classOf[ShowCatalogsCommand], + classOf[ShowCreateTable]) + val isSqlCommandExplicitlyAllowlisted = { + supportedCommand.exists(c => queryPlan.getClass.getName.equals(c.getName)) + } + val isUnsupportedSqlPlan = if (isSqlCommandExplicitlyAllowlisted) { + false + } else { + // If the SQL command is not explicitly allowlisted, check whether it belongs to + // one of commands pipeline explicitly disallow. + // If not, the SQL command is supported. + queryPlan.isInstanceOf[Command] || + queryPlan.isInstanceOf[CreateTableAsSelect] || + queryPlan.isInstanceOf[CreateTable] || + queryPlan.isInstanceOf[CreateView] || + queryPlan.isInstanceOf[InsertIntoStatement] || + queryPlan.isInstanceOf[RenameTable] || + queryPlan.isInstanceOf[CreateNamespace] || + queryPlan.isInstanceOf[DropView] + } + // scalastyle:on Review Comment: Why scalastyle:on here? Maybe a relic of print debugging? ########## python/pyspark/pipelines/add_pipeline_analysis_context.py: ########## @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from contextlib import contextmanager +from typing import Generator, Optional +from pyspark.sql import SparkSession + +from typing import Any, cast + + +@contextmanager +def add_pipeline_analysis_context( + spark: SparkSession, dataflow_graph_id: str, flow_name_opt: Optional[str] Review Comment: Nitpick: it's not typicaly idiomatic in Python to have an `_opt` suffix for `Optional`-typed vars. ########## python/pyspark/pipelines/add_pipeline_analysis_context.py: ########## @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from contextlib import contextmanager +from typing import Generator, Optional +from pyspark.sql import SparkSession + +from typing import Any, cast + + +@contextmanager +def add_pipeline_analysis_context( + spark: SparkSession, dataflow_graph_id: str, flow_name_opt: Optional[str] +) -> Generator[None, None, None]: + """ + Context manager that add PipelineAnalysisContext extension to the user context + used for pipeline specific analysis. + """ + _extension_id = None Review Comment: No need to use underscores for variables inside this function, because local variables are already "private" to the function they're inside. ########## sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala: ########## @@ -129,6 +131,46 @@ private[connect] object PipelinesHandler extends Logging { } } + /** + * Block unsupported SQL commands that are not explicitly allowlisted. + */ + def blockUnsupportedSqlCommand(queryPlan: LogicalPlan): Unit = { + val supportedCommand = Set( + classOf[DescribeRelation], + classOf[ShowTables], + classOf[ShowTableProperties], + classOf[ShowNamespacesCommand], + classOf[ShowColumns], + classOf[ShowFunctions], + classOf[ShowViews], + classOf[ShowCatalogsCommand], + classOf[ShowCreateTable]) + val isSqlCommandExplicitlyAllowlisted = { + supportedCommand.exists(c => queryPlan.getClass.getName.equals(c.getName)) + } + val isUnsupportedSqlPlan = if (isSqlCommandExplicitlyAllowlisted) { + false + } else { + // If the SQL command is not explicitly allowlisted, check whether it belongs to Review Comment: It took me a minute to parse what was going on here. Is my understanding correct that we basically disallow all query plans that are descendants of the plan classes here, **except** for some subclasses of `Command` that we know have no side effects? ########## sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala: ########## @@ -129,6 +131,46 @@ private[connect] object PipelinesHandler extends Logging { } } + /** + * Block unsupported SQL commands that are not explicitly allowlisted. + */ + def blockUnsupportedSqlCommand(queryPlan: LogicalPlan): Unit = { + val supportedCommand = Set( + classOf[DescribeRelation], + classOf[ShowTables], + classOf[ShowTableProperties], + classOf[ShowNamespacesCommand], + classOf[ShowColumns], + classOf[ShowFunctions], + classOf[ShowViews], + classOf[ShowCatalogsCommand], + classOf[ShowCreateTable]) + val isSqlCommandExplicitlyAllowlisted = { + supportedCommand.exists(c => queryPlan.getClass.getName.equals(c.getName)) Review Comment: Nit that's not a big deal: why compare by name vs. the class object? Does `equals` not work for classes? ########## python/pyspark/pipelines/spark_connect_graph_element_registry.py: ########## @@ -110,8 +112,11 @@ def register_output(self, output: Output) -> None: self._client.execute_command(command) def register_flow(self, flow: Flow) -> None: - with block_spark_connect_execution_and_analysis(): - df = flow.func() + with add_pipeline_analysis_context( Review Comment: If I understand correctly, this will result in two PipelineAnalysisContexts added to the same request. And the server code knows to expect that. ########## python/pyspark/pipelines/block_connect_access.py: ########## @@ -24,6 +24,22 @@ BLOCKED_RPC_NAMES = ["AnalyzePlan", "ExecutePlan"] +def _is_sql_command_request(request: object) -> bool: + """Check if the request is spark.sql() command (ExecutePlanRequest with a sql_command).""" + try: + if not hasattr(request, "plan"): + return False + + plan = request.plan + + if not plan.HasField("command"): + return False + + return plan.command.HasField("sql_command") + except Exception: Review Comment: In general it's not great to have a blanket `except Exception` like this. What would cause us to hit it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
