cloud-fan commented on code in PR #55768: URL: https://github.com/apache/spark/pull/55768#discussion_r3340053871
########## sql/core/src/main/scala/org/apache/spark/sql/execution/externalUDF/ExternalUDFPlanner.scala: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.externalUDF + +import org.apache.spark.SparkConf +import org.apache.spark.annotation.Experimental +import org.apache.spark.resource.ResourceProfile +import org.apache.spark.sql.catalyst.expressions.{Expression, + ExternalUserDefinedFunction, PythonUDF} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, + MapInArrow, MapInPandas, MapPartitionsExternalUDF} +import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes +import org.apache.spark.sql.types.StructType + +/** + * Strategy for converting UDF calls into logical plan nodes. + * Implementations decide whether to use the classic + * language-specific runner or the unified external UDF worker + * framework. + * + * Wired into [[org.apache.spark.sql.internal.SessionState]] via + * [[org.apache.spark.sql.internal.BaseSessionStateBuilder]]. + */ +trait ExternalUDFPlanner { + + /** + * Creates the logical plan node for a Python mapInPandas + * operation. + */ + def planPythonMapInPandas( + func: Expression, + child: LogicalPlan, + isBarrier: Boolean, + profile: Option[ResourceProfile]): LogicalPlan + + /** + * Creates the logical plan node for a Python mapInArrow + * operation. + */ + def planPythonMapInArrow( + func: Expression, + child: LogicalPlan, + isBarrier: Boolean, + profile: Option[ResourceProfile]): LogicalPlan +} + +/** + * Classic [[ExternalUDFPlanner]] that uses the built-in Python + * runner. + */ +class ClassicExternalUDFPlanner extends ExternalUDFPlanner { + + override def planPythonMapInPandas( + func: Expression, + child: LogicalPlan, + isBarrier: Boolean, + profile: Option[ResourceProfile]): LogicalPlan = { + val output = toAttributes( + func.dataType.asInstanceOf[StructType]) + MapInPandas(func, output, child, isBarrier, profile) + } + + override def planPythonMapInArrow( + func: Expression, + child: LogicalPlan, + isBarrier: Boolean, + profile: Option[ResourceProfile]): LogicalPlan = { + val output = toAttributes( + func.dataType.asInstanceOf[StructType]) + MapInArrow(func, output, child, isBarrier, profile) + } +} + +/** + * :: Experimental :: + * Unified [[ExternalUDFPlanner]] that uses the language-agnostic + * external UDF worker framework. + */ +@Experimental +class UnifiedExternalUDFPlanner( + private val conf: SparkConf) extends ExternalUDFPlanner { + + override def planPythonMapInPandas( + func: Expression, + child: LogicalPlan, + isBarrier: Boolean, + profile: Option[ResourceProfile]): LogicalPlan = { + val pythonUdf = func.asInstanceOf[PythonUDF] Review Comment: `planPythonMapInPandas` accepts `profile` but never uses it, and `MapPartitionsExternalUDF` has no profile field — so a `ResourceProfile` passed to `DataFrame.mapInPandas` is silently dropped when the unified config is on. The classic path and `planPythonMapInArrow` both thread `Option(profile)` through. Since `isBarrier` was deliberately kept for future use, should `profile` be carried the same way rather than dropped? ########## core/src/main/scala/org/apache/spark/SparkEnv.scala: ########## @@ -120,6 +123,48 @@ class SparkEnv ( pythonExec: String, workerModule: String, daemonModule: String, envVars: Map[String, String]) private val pythonWorkers = mutable.HashMap[PythonWorkersKey, PythonWorkerFactory]() + /** + * :: Experimental :: + * Dispatcher factory to generate UDF worker dispatchers + * using the new UDF framework proposed in SPARK-55278. + * Initialized on first use via [[getExternalUDFDispatcher]]. + */ + @volatile private var udfDispatcherManager: Option[UDFDispatcherManager] = None + + private def createUDFDispatcherManager(): UDFDispatcherManager = { + val factory = new UDFDispatcherFactory { + override def createDispatcher( + workerSpec: UDFWorkerSpecification, + logger: org.apache.spark.udf.worker.core.WorkerLogger + ): WorkerDispatcher = { + // TODO [SPARK-55278]: Wire in the correct dispatcher factory + throw new UnsupportedOperationException( + "No UDF dispatcher factory configured. " + + "Set up a concrete factory for SPARK-55278.") + } + } + new UDFDispatcherManager(factory, new SparkUDFWorkerLogger()) + } + + /** + * :: Experimental :: + * Returns the [[WorkerDispatcher]] for the given worker + * specification via the [[UDFDispatcherManager]]. + */ + private[spark] def getExternalUDFDispatcher( + workerSpec: UDFWorkerSpecification): WorkerDispatcher = { + val manager : UDFDispatcherManager = udfDispatcherManager.getOrElse { + synchronized { + // Get or Else synchronized to protect + // against concurrent creation requests. + udfDispatcherManager.getOrElse { + createUDFDispatcherManager() Review Comment: `createUDFDispatcherManager()` is called but its result is never assigned back to `udfDispatcherManager`, so the field stays `None` forever. Two consequences: a fresh manager (with an empty dispatcher cache) is built on every call, defeating the manager's spec-keyed cache; and `stop()`'s `udfDispatcherManager.foreach(_.close())` becomes a no-op, so dispatchers/worker processes are never closed on shutdown. This is the same lazy-init you described when resolving the earlier threads on this method — but the field is never actually initialized. (Latent today since the factory placeholder throws, but worth fixing before it's wired.) ```suggestion val created = createUDFDispatcherManager() udfDispatcherManager = Some(created) created ``` ########## sql/core/src/main/scala/org/apache/spark/sql/execution/externalUDF/MapPartitionsExternalUDFExec.scala: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.externalUDF + +import org.apache.spark.TaskContext +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{ + Attribute, + ExternalUserDefinedFunction +} +import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.types.StructType +import org.apache.spark.udf.worker.UDFWorkerSpecification + +/** + * :: Experimental :: + * Physical plan node that executes a mapPartitions-style UDF in an + * external worker process. + * + * @param workerSpec Specification describing the UDF worker. + * @param functionExpr The UDF to invoke. + * @param isBarrier Whether the UDF should be invoked using barrier execution. + * @param resultAttributes Output attributes produced by the UDF. Review Comment: Stale Scaladoc params: the actual constructor param is `function` (not `functionExpr`), and there is no `resultAttributes` param. ```suggestion * @param function The UDF to invoke. * @param isBarrier Whether the UDF should be invoked using barrier execution. ``` ########## core/pom.xml: ########## @@ -35,6 +35,16 @@ </properties> <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-udf-worker-proto_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-udf-worker-core_${scala.binary.version}</artifactId> Review Comment: Making `spark-udf-worker-{proto,core}` compile deps of `core` (and `catalyst`) pulls grpc-* and `proto-google-common-protos` onto the core/assembly classpath (visible in the deps-manifest diff). Spark has historically kept gRPC isolated to Connect (shaded) to avoid protobuf/grpc version clashes on the widely-shared core classpath. Is this footprint expansion intended, or could the proto types `SparkEnv` needs live in a lighter-weight module so core doesn't compile against the full grpc stack? ########## udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/UDFDispatcherManager.scala: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.udf.worker.core + +import java.util.HashMap +import java.util.concurrent.locks.ReentrantReadWriteLock + +import scala.util.control.NonFatal + +import org.apache.spark.annotation.Experimental +import org.apache.spark.udf.worker.UDFWorkerSpecification + +/** + * :: Experimental :: + * Manages [[WorkerDispatcher]] instances, caching them by + * [[UDFWorkerSpecification]] (protobuf value equality). + * + * Callers obtain a dispatcher via [[getDispatcher]] and create + * sessions on it directly. On [[stop]], all cached dispatchers + * are closed -- dispatchers are responsible for cleaning up + * their own sessions. + * + * Thread safety: a [[ReentrantReadWriteLock]] allows concurrent + * [[getDispatcher]] calls (read lock) while [[stop]] has + * exclusive access (write lock). + */ +@Experimental +class UDFDispatcherManager( + private val dispatcherFactory: UDFDispatcherFactory, + workerLogger: WorkerLogger = WorkerLogger.NoOp +) { + + // Guarded by `rwLock`. The read lock is used by getDispatcher + // (with upgrade when a new dispatcher must be added) and the + // write lock is used by stop. Review Comment: Stale: the write lock is held by `close`, not `stop`. ```suggestion // write lock is used by close. ``` ########## udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/UDFDispatcherManager.scala: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.udf.worker.core + +import java.util.HashMap +import java.util.concurrent.locks.ReentrantReadWriteLock + +import scala.util.control.NonFatal + +import org.apache.spark.annotation.Experimental +import org.apache.spark.udf.worker.UDFWorkerSpecification + +/** + * :: Experimental :: + * Manages [[WorkerDispatcher]] instances, caching them by + * [[UDFWorkerSpecification]] (protobuf value equality). + * + * Callers obtain a dispatcher via [[getDispatcher]] and create + * sessions on it directly. On [[stop]], all cached dispatchers + * are closed -- dispatchers are responsible for cleaning up + * their own sessions. + * + * Thread safety: a [[ReentrantReadWriteLock]] allows concurrent + * [[getDispatcher]] calls (read lock) while [[stop]] has + * exclusive access (write lock). + */ +@Experimental +class UDFDispatcherManager( + private val dispatcherFactory: UDFDispatcherFactory, + workerLogger: WorkerLogger = WorkerLogger.NoOp +) { + + // Guarded by `rwLock`. The read lock is used by getDispatcher + // (with upgrade when a new dispatcher must be added) and the + // write lock is used by stop. + private val rwLock = new ReentrantReadWriteLock() + private val dispatchers = + new HashMap[UDFWorkerSpecification, WorkerDispatcher]() + private var closed = false + + /** + * Returns the [[WorkerDispatcher]] for the given spec, creating + * one via the [[UDFDispatcherFactory]] if none exists yet. + */ + def getDispatcher( + workerSpec: UDFWorkerSpecification): WorkerDispatcher = { + // First, try to read an existing dispatcher = quick path + rwLock.readLock().lock() + try { + if (closed) throwClosed() + + // Reading existing dispatcher = quick path Review Comment: This duplicates the `quick path` comment on line 61 a few lines up; consider dropping one. ########## udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/UDFDispatcherManager.scala: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.udf.worker.core + +import java.util.HashMap +import java.util.concurrent.locks.ReentrantReadWriteLock + +import scala.util.control.NonFatal + +import org.apache.spark.annotation.Experimental +import org.apache.spark.udf.worker.UDFWorkerSpecification + +/** + * :: Experimental :: + * Manages [[WorkerDispatcher]] instances, caching them by + * [[UDFWorkerSpecification]] (protobuf value equality). + * + * Callers obtain a dispatcher via [[getDispatcher]] and create + * sessions on it directly. On [[stop]], all cached dispatchers Review Comment: `[[stop]]` is a dangling Scaladoc link — the method is `close()`. (Same on line 38.) ```suggestion * sessions on it directly. On [[close]], all cached dispatchers ``` ########## sql/core/src/main/scala/org/apache/spark/sql/execution/externalUDF/ExternalUDFExec.scala: ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.externalUDF + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.UnaryExecNode +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.udf.worker.UDFWorkerSpecification +import org.apache.spark.udf.worker.core.{WorkerSecurityScope, WorkerSession} + +/** + * :: Experimental :: + * Base trait for physical plan nodes that execute UDFs in an external + * worker process via the language-agnostic UDF worker framework. + * + * Dispatchers are obtained via [[SparkEnv#getExternalUDFDispatcher]], + * which uses the [[UDFDispatcherManager]] registered on the + * environment. This avoids serializing the manager as part of the + * physical plan. + */ +@Experimental +trait ExternalUDFExec extends UnaryExecNode { + + /** + * Specification describing how to create and communicate with the UDF worker. + * There is exactly one specification per [[ExternalUDFExec]] node. + */ + def workerSpec: UDFWorkerSpecification + + // --------------------------------------------------------------------------- + // Metrics + // --------------------------------------------------------------------------- + + protected def externalUdfMetrics: Map[String, SQLMetric] = Map( + // TODO [SPARK-55278]: Emit the correct metrics here + ) + + override lazy val metrics: Map[String, SQLMetric] = externalUdfMetrics + + // --------------------------------------------------------------------------- + // Session lifecycle + // --------------------------------------------------------------------------- + + /** + * Creates a [[WorkerSession]] via [[SparkEnv#getExternalUDFDispatcher]]. + * Registers session cancellation on task failure and session termination + * on task completion. The provided function receives the session + * and must return the result iterator. The function CAN but MUST NOT + * cancel or close the session. Review Comment: "CAN but MUST NOT cancel or close" reads as a contradiction. Reword to make clear the function may use the session but not tear it down. ```suggestion * and must return the result iterator. The function may use the * session but MUST NOT cancel or close it. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
