jonmio commented on code in PR #51956:
URL: https://github.com/apache/spark/pull/51956#discussion_r2270389088


##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.pipelines
+
+import java.util.concurrent.{BlockingQueue, Executors, ExecutorService, 
LinkedBlockingQueue}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.{Timestamp => ProtoTimestamp}
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.ExecutePlanResponse
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.service.SessionHolder
+import org.apache.spark.sql.pipelines.logging.PipelineEvent
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Handles sending pipeline events to the client in a background thread using 
ExecutorService.
+ * This prevents pipeline execution from blocking on response observer 
operations and provides
+ * better error isolation between pipeline execution and client communication.
+ *
+ * Based on the executor pattern used in DataPlaneEventLog for better thread 
management.
+ */
+class PipelineEventSender(
+    responseObserver: StreamObserver[ExecutePlanResponse],
+    sessionHolder: SessionHolder)
+    extends Logging {
+
+  // Thread-safe queue for buffering events
+  private val eventQueue: BlockingQueue[PipelineEventMessage] = new 
LinkedBlockingQueue()
+
+  // ExecutorService for background event processing
+  private val executor: ExecutorService = {
+    val threadFactory = ThreadUtils
+      .namedThreadFactory(s"pipeline-event-sender-${sessionHolder.sessionId}")
+    Executors.newSingleThreadExecutor(threadFactory)
+  }
+
+  // state tracking
+  private val isShutdown = new AtomicBoolean(false)
+  private val isStarted = new AtomicBoolean(false)
+
+  // Sealed trait for different message types
+  private sealed trait PipelineEventMessage
+  private case class EventMessage(event: PipelineEvent) extends 
PipelineEventMessage
+  private case object ShutdownMessage extends PipelineEventMessage
+
+  /**
+   * Start the background executor for sending events, only if not already 
started.
+   */
+  def start(): Unit = {
+    if (isStarted.compareAndSet(false, true)) {
+      executor.submit(new Runnable {
+        override def run(): Unit = {
+          try {
+            processEvents()
+          } catch {
+            case NonFatal(e) =>
+              logError(s"Pipeline event sender failed for session 
${sessionHolder.sessionId}", e)
+          }
+        }
+      })
+      logInfo(s"Started pipeline event sender for session 
${sessionHolder.sessionId}")
+    }
+  }
+
+  /**
+   * Send an event asynchronously by adding it to the queue
+   */
+  def sendEvent(event: PipelineEvent): Unit = {
+    if (!isShutdown.get()) {
+      val offered = eventQueue.offer(EventMessage(event))
+      if (!offered) {
+        logWarning(s"Failed to queue pipeline event for session 
${sessionHolder.sessionId}")
+      }
+    }
+  }
+
+  /**
+   * Shutdown the event sender, stop taking new events and wait for processing 
to complete.
+   */
+  def shutdown(): Unit = {
+    if (isShutdown.compareAndSet(false, true)) {
+      // Signal shutdown to the processing loop
+      eventQueue.offer(ShutdownMessage)
+      // Request a shutdown of the executor which waits for processEvents to 
complete
+      executor.shutdown()
+      // Blocks until all tasks have completed execution after a shutdown 
request,
+      // disregard the timeout since we want all events to be processed
+      if (!executor.awaitTermination(Long.MaxValue, 
java.util.concurrent.TimeUnit.MILLISECONDS)) {
+        logError(s"Pipeline event sender for session 
${sessionHolder.sessionId}" +
+          s"failed to terminate")
+        executor.shutdownNow()
+      }
+    }
+  }
+
+  /**
+   * Main event processing loop that runs in the background executor
+   */
+  private def processEvents(): Unit = {
+    while (true) {
+      try {
+        val message = eventQueue.take()
+
+        message match {
+          case EventMessage(event) =>
+            sendEventToClient(event)
+          case ShutdownMessage =>

Review Comment:
   I think I realized this is so that we process existing events before 
shutting down. Can we document?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to