sven-weber-db commented on code in PR #55611:
URL: https://github.com/apache/spark/pull/55611#discussion_r3183188201


##########
udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSessionFactory.scala:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.udf.worker.core
+
+import java.util.HashMap
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.udf.worker.UDFWorkerSpecification
+/**
+ * :: Experimental ::
+ * Creates [[WorkerSession]] instances for a given
+ * [[UDFWorkerSpecification]], managing [[WorkerDispatcher]] instances
+ * and their lifecycle internally.
+ *
+ * Dispatchers are cached by spec (protobuf value equality) and reused
+ * across sessions. The factory tracks the number of active sessions
+ * per dispatcher. When the last session for a dispatcher is closed,
+ * the entry is removed and [[onAllDispatcherSessionsClosed]] is
+ * called.
+ *
+ * Thread safety: a single lock guards all map mutations and session
+ * count changes. The actual [[WorkerDispatcher#createSession]] call
+ * and the [[onAllDispatcherSessionsClosed]] callback happen outside
+ * the lock.
+ *
+ * Subclasses must implement [[doCreateDispatcher]] to provide a
+ * concrete dispatcher and [[onAllDispatcherSessionsClosed]] to
+ * control dispatcher teardown policy. Keeping a dispatcher alive
+ * after all its sessions have closed should be a conscious decision.
+ */
+@Experimental
+abstract class WorkerSessionFactory {
+
+  private class DispatcherEntry(val dispatcher: WorkerDispatcher) {
+    var activeSessionCount: Int = 0
+  }
+
+  // All access guarded by `synchronized(lock)`.
+  private val lock = new Object
+  private val dispatchers =
+    new HashMap[UDFWorkerSpecification, DispatcherEntry]()

Review Comment:
   I tried to use a concurrent hash map but its not super straight forward 
here. In the `close` function, we need to fetch the current session count, 
evaluate it, and potentially remove the dispatcher when the count is 0. All of 
this logic needs to protected from potential, concurrent updates on the same 
key. Therefore, a classical lock might be simpler here.
   
   If you have a way to express this with a concurrent hash map I am happy to 
adjust the code



##########
udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSessionFactory.scala:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.udf.worker.core
+
+import java.util.HashMap
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.udf.worker.UDFWorkerSpecification
+/**
+ * :: Experimental ::
+ * Creates [[WorkerSession]] instances for a given
+ * [[UDFWorkerSpecification]], managing [[WorkerDispatcher]] instances
+ * and their lifecycle internally.
+ *
+ * Dispatchers are cached by spec (protobuf value equality) and reused
+ * across sessions. The factory tracks the number of active sessions
+ * per dispatcher. When the last session for a dispatcher is closed,
+ * the entry is removed and [[onAllDispatcherSessionsClosed]] is
+ * called.
+ *
+ * Thread safety: a single lock guards all map mutations and session
+ * count changes. The actual [[WorkerDispatcher#createSession]] call
+ * and the [[onAllDispatcherSessionsClosed]] callback happen outside
+ * the lock.
+ *
+ * Subclasses must implement [[doCreateDispatcher]] to provide a
+ * concrete dispatcher and [[onAllDispatcherSessionsClosed]] to
+ * control dispatcher teardown policy. Keeping a dispatcher alive
+ * after all its sessions have closed should be a conscious decision.
+ */
+@Experimental
+abstract class WorkerSessionFactory {
+
+  private class DispatcherEntry(val dispatcher: WorkerDispatcher) {
+    var activeSessionCount: Int = 0
+  }
+
+  // All access guarded by `synchronized(lock)`.
+  private val lock = new Object
+  private val dispatchers =
+    new HashMap[UDFWorkerSpecification, DispatcherEntry]()
+
+  /**
+   * Creates a [[WorkerSession]] for the given worker specification
+   * and optional security scope.
+   *
+   * If a dispatcher for this spec already exists it is reused;
+   * otherwise [[doCreateDispatcher]] is called to create one.
+   */
+  final def createSession(
+      workerSpec: UDFWorkerSpecification,
+      securityScope: Option[WorkerSecurityScope] = None
+  ): WorkerSession = {
+    val entry = lock.synchronized {
+      var e = dispatchers.get(workerSpec)

Review Comment:
   Yes, good catch. Fixed!



##########
udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSessionFactory.scala:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.udf.worker.core
+
+import java.util.HashMap
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.udf.worker.UDFWorkerSpecification
+/**
+ * :: Experimental ::
+ * Creates [[WorkerSession]] instances for a given
+ * [[UDFWorkerSpecification]], managing [[WorkerDispatcher]] instances
+ * and their lifecycle internally.
+ *
+ * Dispatchers are cached by spec (protobuf value equality) and reused
+ * across sessions. The factory tracks the number of active sessions
+ * per dispatcher. When the last session for a dispatcher is closed,
+ * the entry is removed and [[onAllDispatcherSessionsClosed]] is
+ * called.
+ *
+ * Thread safety: a single lock guards all map mutations and session
+ * count changes. The actual [[WorkerDispatcher#createSession]] call
+ * and the [[onAllDispatcherSessionsClosed]] callback happen outside
+ * the lock.
+ *
+ * Subclasses must implement [[doCreateDispatcher]] to provide a
+ * concrete dispatcher and [[onAllDispatcherSessionsClosed]] to
+ * control dispatcher teardown policy. Keeping a dispatcher alive
+ * after all its sessions have closed should be a conscious decision.
+ */
+@Experimental
+abstract class WorkerSessionFactory {
+
+  private class DispatcherEntry(val dispatcher: WorkerDispatcher) {
+    var activeSessionCount: Int = 0
+  }
+
+  // All access guarded by `synchronized(lock)`.
+  private val lock = new Object
+  private val dispatchers =
+    new HashMap[UDFWorkerSpecification, DispatcherEntry]()
+
+  /**
+   * Creates a [[WorkerSession]] for the given worker specification
+   * and optional security scope.
+   *
+   * If a dispatcher for this spec already exists it is reused;
+   * otherwise [[doCreateDispatcher]] is called to create one.
+   */
+  final def createSession(
+      workerSpec: UDFWorkerSpecification,
+      securityScope: Option[WorkerSecurityScope] = None
+  ): WorkerSession = {
+    val entry = lock.synchronized {
+      var e = dispatchers.get(workerSpec)
+      if (e == null) {
+        e = new DispatcherEntry(doCreateDispatcher(workerSpec))
+        dispatchers.put(workerSpec, e)
+      }
+      e.activeSessionCount += 1
+      e
+    }
+    val underlying = entry.dispatcher.createSession(securityScope)
+    new ManagedWorkerSession(underlying, workerSpec)
+  }
+
+  /**
+   * Creates a new [[WorkerDispatcher]] for the given specification.
+   * Called at most once per structurally distinct spec (until the
+   * entry is removed after all sessions close). The dispatcher may
+   * be used for many sessions over its lifetime.
+   */
+  protected def doCreateDispatcher(
+      workerSpec: UDFWorkerSpecification): WorkerDispatcher
+
+  /**
+   * Called when the last active session for a dispatcher is closed
+   * and the entry has been removed from the cache. Subclasses must
+   * decide what to do with the now-idle dispatcher: close it
+   * immediately, schedule idle-timeout eviction, etc.
+   *
+   * Called outside the lock so subclasses may perform blocking work.
+   *
+   * @param dispatcher  The dispatcher whose session count reached
+   *                    zero. Guaranteed to be a dispatcher returned
+   *                    from [[doCreateDispatcher]].
+   */
+  protected def onAllDispatcherSessionsClosed(
+      dispatcher: WorkerDispatcher): Unit
+
+  private class ManagedWorkerSession(
+      underlying: WorkerSession,
+      workerSpec: UDFWorkerSpecification)
+    extends DelegatedWorkerSession(underlying) {
+
+    override def close(): Unit = {

Review Comment:
   I think I found a nicer way of doing this. Lets discuss based on the updated 
code



##########
udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/DelegatedWorkerSession.scala:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.udf.worker.core
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * A [[WorkerSession]] that delegates all operations to an underlying
+ * session. Subclasses override only the methods they need to
+ * intercept (typically [[close]]).
+ */
+@Experimental
+abstract class DelegatedWorkerSession(

Review Comment:
   I think I found a better way to implement this using a completion callback 
in the session. Lets discuss based on the new version of the code



##########
udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSessionFactory.scala:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.udf.worker.core
+
+import java.util.HashMap
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.udf.worker.UDFWorkerSpecification
+/**
+ * :: Experimental ::
+ * Creates [[WorkerSession]] instances for a given
+ * [[UDFWorkerSpecification]], managing [[WorkerDispatcher]] instances
+ * and their lifecycle internally.
+ *
+ * Dispatchers are cached by spec (protobuf value equality) and reused
+ * across sessions. The factory tracks the number of active sessions
+ * per dispatcher. When the last session for a dispatcher is closed,
+ * the entry is removed and [[onAllDispatcherSessionsClosed]] is
+ * called.
+ *
+ * Thread safety: a single lock guards all map mutations and session
+ * count changes. The actual [[WorkerDispatcher#createSession]] call
+ * and the [[onAllDispatcherSessionsClosed]] callback happen outside
+ * the lock.
+ *
+ * Subclasses must implement [[doCreateDispatcher]] to provide a
+ * concrete dispatcher and [[onAllDispatcherSessionsClosed]] to
+ * control dispatcher teardown policy. Keeping a dispatcher alive
+ * after all its sessions have closed should be a conscious decision.
+ */
+@Experimental
+abstract class WorkerSessionFactory {
+
+  private class DispatcherEntry(val dispatcher: WorkerDispatcher) {
+    var activeSessionCount: Int = 0

Review Comment:
   I think there are the following reasons to keep the `activeSessionCount` 
here instead of the Dispatcher:
   
   1. The Dispatcher is a factory class which pushes the cleanup of sessions to 
its consumers. If we were to track the `activeSessionCount` in the Dispatcher, 
this would mean we need to track session closures there as well. This 
contradits the current session class design and it could easily create race 
conditions depending on the exact caller order and when the count value is 
updated.
   2. The `WorkerSessionnFactory` provides a single place where the logic can 
be implemented - not duplicating code across many dispatchers 



##########
udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSessionFactory.scala:
##########
@@ -0,0 +1,128 @@
+/*

Review Comment:
   Yes, agreed. Lets split it into two parts



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to