sven-weber-db commented on code in PR #55611:
URL: https://github.com/apache/spark/pull/55611#discussion_r3194049669
##########
udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala:
##########
@@ -76,16 +77,49 @@ case class InitMessage(
* - [[close]] must always be called (use try-finally).
* - [[cancel]] may be called at any time to abort execution.
*
- * The lifecycle is enforced here: [[init]] and [[process]] are `final`
- * and delegate to [[doInit]] / [[doProcess]] after AtomicBoolean guards.
+ * The lifecycle is enforced here: [[init]], [[process]], [[cancel]],
+ * and [[close]] are `final` and delegate to [[doInit]] / [[doProcess]] /
+ * [[doCancel]], and [[doClose]] after AtomicBoolean guards.
* Subclasses implement the protocol-specific work and do not re-check
* the contract.
+ *
+ *
+ * Completion listeners registered via [[addSessionCompletionListener]]
+ * are fired exactly once, after [[doClose]] or [[doCancel]]
+ * (whichever runs first). Note that the completion listener can
+ * be executed in a completly seperate thread from the thread who
+ * registered the listener.
*/
@Experimental
-abstract class WorkerSession extends AutoCloseable {
+abstract class WorkerSession(
+ workerLogger: WorkerLogger
+) extends AutoCloseable {
+
+ protected val logger: WorkerLogger =
+ workerLogger.forClass(getClass)
+
+ /** Unique identifier for this session. */
+ val sessionId: String = java.util.UUID.randomUUID().toString
private val initialized = new AtomicBoolean(false)
private val processed = new AtomicBoolean(false)
+ private val closed = new AtomicBoolean(false)
+ private val completionListeners =
Review Comment:
Good question. You are right, that's not the best implementation here. When
revisiting this code, I also noticed that the implementation for the
`completionListeners` is not 100% correct: A listener will not be called when
the session was closed before the listener was added.
I changed the code to use a locking object and a normal `ArrayList` instead.
Additionally, `completionListeners` are now also fired when the session was
already closed/cancled before. I also added some tests to validate the
completion listener logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]