nija-at commented on code in PR #41443:
URL: https://github.com/apache/spark/pull/41443#discussion_r1217689808


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/Events.scala:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import com.fasterxml.jackson.annotation.JsonIgnore
+import com.google.protobuf.Message
+
+import org.apache.spark.SparkContext
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.ExecutePlanRequest
+import org.apache.spark.scheduler.SparkListenerEvent
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.util.{Clock, Utils}
+
+object Events {
+  // TODO: Make this configurable
+  val MAX_STATEMENT_TEXT_SIZE = 65535
+}
+
+case class Events(sessionHolder: SessionHolder, clock: Clock) {
+  def postStarted(planHolder: ExecutePlanHolder, v: ExecutePlanRequest): Unit 
= {

Review Comment:
   Looks like the request is likely already part of the holder, at least as 
part of the holder initalization. Could we just use that?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/Events.scala:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import com.fasterxml.jackson.annotation.JsonIgnore
+import com.google.protobuf.Message
+
+import org.apache.spark.SparkContext
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.ExecutePlanRequest
+import org.apache.spark.scheduler.SparkListenerEvent
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.util.{Clock, Utils}
+
+object Events {
+  // TODO: Make this configurable
+  val MAX_STATEMENT_TEXT_SIZE = 65535
+}
+
+case class Events(sessionHolder: SessionHolder, clock: Clock) {
+  def postStarted(planHolder: ExecutePlanHolder, v: ExecutePlanRequest): Unit 
= {
+    val sc = sessionHolder.session.sparkContext
+    val plan: Message =
+      v.getPlan.getOpTypeCase match {
+        case proto.Plan.OpTypeCase.COMMAND => v.getPlan.getCommand
+        case proto.Plan.OpTypeCase.ROOT => v.getPlan.getRoot
+        case _ =>
+          throw new UnsupportedOperationException(s"${v.getPlan.getOpTypeCase} 
not supported.")
+      }
+
+    sc.listenerBus.post(
+      SparkListenerConnectOperationStarted(
+        planHolder.jobGroupId,
+        planHolder.operationId,
+        clock.getTimeMillis(),
+        v.getSessionId,
+        v.getUserContext.getUserId,
+        v.getUserContext.getUserName,
+        Utils.redact(
+          sessionHolder.session.sessionState.conf.stringRedactionPattern,
+          ProtoUtils.abbreviate(plan, 
Events.MAX_STATEMENT_TEXT_SIZE).toString),
+        v.getClientType))
+  }
+
+  private def assertExecutedPlanPrepared(dataFrameOpt: Option[DataFrame]): 
Unit = {
+    dataFrameOpt.foreach { dataFrame =>
+      val isEagerlyExecuted = dataFrame.queryExecution.analyzed.find {
+        case _: Command => true
+        case _ => false
+      }.isDefined
+      val isStreaming = dataFrame.queryExecution.analyzed.isStreaming
+
+      if (!isEagerlyExecuted && !isStreaming) {
+        dataFrame.queryExecution.executedPlan
+      }
+    }
+  }
+
+  /**
+   * When plan has been optimized prior to execution.
+   */
+  def postParsed(dataFrameOpt: Option[DataFrame]): Unit = {

Review Comment:
   minor: would be good to have `None` as the default arguments of type 
`Option`, so that `postParsed()` is a nicer alias of `postParsed(None)`.
   
   apply everywhere.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/Events.scala:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import com.fasterxml.jackson.annotation.JsonIgnore
+import com.google.protobuf.Message
+
+import org.apache.spark.SparkContext
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.ExecutePlanRequest
+import org.apache.spark.scheduler.SparkListenerEvent
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.util.{Clock, Utils}
+
+object Events {
+  // TODO: Make this configurable
+  val MAX_STATEMENT_TEXT_SIZE = 65535
+}
+
+case class Events(sessionHolder: SessionHolder, clock: Clock) {
+  def postStarted(planHolder: ExecutePlanHolder, v: ExecutePlanRequest): Unit 
= {
+    val sc = sessionHolder.session.sparkContext
+    val plan: Message =
+      v.getPlan.getOpTypeCase match {
+        case proto.Plan.OpTypeCase.COMMAND => v.getPlan.getCommand
+        case proto.Plan.OpTypeCase.ROOT => v.getPlan.getRoot
+        case _ =>
+          throw new UnsupportedOperationException(s"${v.getPlan.getOpTypeCase} 
not supported.")
+      }
+
+    sc.listenerBus.post(
+      SparkListenerConnectOperationStarted(
+        planHolder.jobGroupId,
+        planHolder.operationId,
+        clock.getTimeMillis(),
+        v.getSessionId,
+        v.getUserContext.getUserId,
+        v.getUserContext.getUserName,
+        Utils.redact(
+          sessionHolder.session.sessionState.conf.stringRedactionPattern,
+          ProtoUtils.abbreviate(plan, 
Events.MAX_STATEMENT_TEXT_SIZE).toString),
+        v.getClientType))
+  }
+
+  private def assertExecutedPlanPrepared(dataFrameOpt: Option[DataFrame]): 
Unit = {
+    dataFrameOpt.foreach { dataFrame =>
+      val isEagerlyExecuted = dataFrame.queryExecution.analyzed.find {
+        case _: Command => true
+        case _ => false
+      }.isDefined
+      val isStreaming = dataFrame.queryExecution.analyzed.isStreaming
+
+      if (!isEagerlyExecuted && !isStreaming) {
+        dataFrame.queryExecution.executedPlan
+      }
+    }
+  }
+
+  /**
+   * When plan has been optimized prior to execution.
+   */
+  def postParsed(dataFrameOpt: Option[DataFrame]): Unit = {
+    assertExecutedPlanPrepared(dataFrameOpt)
+    for {
+      jobGroupId <- getJobGroupId()
+      queryId <- ExecutePlanHolder.getQueryOperationId(jobGroupId)
+    } {
+      val event = SparkListenerConnectOperationParsed(jobGroupId, queryId, 
clock.getTimeMillis())
+      event.analyzedPlan = dataFrameOpt.map(_.queryExecution.analyzed)
+      sessionHolder.session.sparkContext.listenerBus.post(event)
+    }
+  }
+  def postCanceled(jobGroupId: String): Unit = {
+    for {
+      queryId <- ExecutePlanHolder.getQueryOperationId(jobGroupId)

Review Comment:
   Why not call `getJobGroupId()` here?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/Events.scala:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import com.fasterxml.jackson.annotation.JsonIgnore
+import com.google.protobuf.Message
+
+import org.apache.spark.SparkContext
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.ExecutePlanRequest
+import org.apache.spark.scheduler.SparkListenerEvent
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.util.{Clock, Utils}
+
+object Events {
+  // TODO: Make this configurable
+  val MAX_STATEMENT_TEXT_SIZE = 65535
+}
+
+case class Events(sessionHolder: SessionHolder, clock: Clock) {
+  def postStarted(planHolder: ExecutePlanHolder, v: ExecutePlanRequest): Unit 
= {
+    val sc = sessionHolder.session.sparkContext
+    val plan: Message =
+      v.getPlan.getOpTypeCase match {
+        case proto.Plan.OpTypeCase.COMMAND => v.getPlan.getCommand
+        case proto.Plan.OpTypeCase.ROOT => v.getPlan.getRoot
+        case _ =>
+          throw new UnsupportedOperationException(s"${v.getPlan.getOpTypeCase} 
not supported.")
+      }
+
+    sc.listenerBus.post(
+      SparkListenerConnectOperationStarted(
+        planHolder.jobGroupId,
+        planHolder.operationId,
+        clock.getTimeMillis(),
+        v.getSessionId,
+        v.getUserContext.getUserId,
+        v.getUserContext.getUserName,
+        Utils.redact(
+          sessionHolder.session.sessionState.conf.stringRedactionPattern,
+          ProtoUtils.abbreviate(plan, 
Events.MAX_STATEMENT_TEXT_SIZE).toString),
+        v.getClientType))
+  }
+
+  private def assertExecutedPlanPrepared(dataFrameOpt: Option[DataFrame]): 
Unit = {
+    dataFrameOpt.foreach { dataFrame =>
+      val isEagerlyExecuted = dataFrame.queryExecution.analyzed.find {
+        case _: Command => true
+        case _ => false
+      }.isDefined
+      val isStreaming = dataFrame.queryExecution.analyzed.isStreaming
+
+      if (!isEagerlyExecuted && !isStreaming) {
+        dataFrame.queryExecution.executedPlan
+      }
+    }
+  }
+
+  /**
+   * When plan has been optimized prior to execution.
+   */
+  def postParsed(dataFrameOpt: Option[DataFrame]): Unit = {
+    assertExecutedPlanPrepared(dataFrameOpt)
+    for {
+      jobGroupId <- getJobGroupId()
+      queryId <- ExecutePlanHolder.getQueryOperationId(jobGroupId)
+    } {
+      val event = SparkListenerConnectOperationParsed(jobGroupId, queryId, 
clock.getTimeMillis())
+      event.analyzedPlan = dataFrameOpt.map(_.queryExecution.analyzed)
+      sessionHolder.session.sparkContext.listenerBus.post(event)
+    }
+  }
+  def postCanceled(jobGroupId: String): Unit = {
+    for {
+      queryId <- ExecutePlanHolder.getQueryOperationId(jobGroupId)
+    } {
+      sessionHolder.session.sparkContext.listenerBus
+        .post(SparkListenerConnectOperationCanceled(jobGroupId, queryId, 
clock.getTimeMillis()))
+    }
+  }
+  def postFailed(errorMessage: String): Unit = {
+    for {
+      jobGroupId <- getJobGroupId()
+      queryId <- ExecutePlanHolder.getQueryOperationId(jobGroupId)
+    } {
+      sessionHolder.session.sparkContext.listenerBus.post(
+        SparkListenerConnectOperationFailed(
+          jobGroupId,
+          queryId,
+          clock.getTimeMillis(),
+          errorMessage))
+    }
+  }
+  def postParsedAndFinished(dataFrameOpt: Option[DataFrame]): Unit = {
+    postParsed(dataFrameOpt)
+    postFinished()
+  }
+  def postFinished(): Unit = {
+    for {
+      jobGroupId <- getJobGroupId()
+      queryId <- ExecutePlanHolder.getQueryOperationId(jobGroupId)
+    } {
+      sessionHolder.session.sparkContext.listenerBus
+        .post(SparkListenerConnectOperationFinished(jobGroupId, queryId, 
clock.getTimeMillis()))
+    }
+  }
+  def postClosed(): Unit = {
+    for {
+      jobGroupId <- getJobGroupId()
+      queryId <- ExecutePlanHolder.getQueryOperationId(jobGroupId)
+    } {
+      sessionHolder.session.sparkContext.listenerBus
+        .post(SparkListenerConnectOperationClosed(jobGroupId, queryId, 
clock.getTimeMillis()))
+    }
+  }
+  def postSessionClosed(): Unit = {
+    sessionHolder.session.sparkContext.listenerBus
+      .post(SparkListenerConnectSessionClosed(sessionHolder.sessionId, 
clock.getTimeMillis()))
+  }
+  def getJobGroupId(): Option[String] = {
+    Option(
+      sessionHolder.session.sparkContext
+        .getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))

Review Comment:
   Curious: when and how is this set? Looking at your code, it seems it's 
available sometimes but not other times?



##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/EventsSuite.scala:
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.util.matching.Regex
+
+import org.mockito.ArgumentMatchers._
+import org.mockito.Mockito._
+import org.scalatest.Tag
+import org.scalatestplus.mockito.MockitoSugar
+
+import org.apache.spark.{SparkContext, SparkFunSuite}
+import org.apache.spark.SparkContext.SPARK_JOB_GROUP_ID
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.{ExecutePlanRequest, Plan, UserContext}
+import org.apache.spark.scheduler.LiveListenerBus
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.connect.planner.SparkConnectPlanTest
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.util.ManualClock
+
+class EventsSuite extends SparkFunSuite with MockitoSugar with 
SparkConnectPlanTest {

Review Comment:
   This test suite doesn't test much IMHO, since everything is mocked. It's 
simply -> does calling method `a()` call `b()` - which doesn't add much value.
   
   Instead, I think it's better value to add new test cases here - 
https://github.com/apache/spark/blob/master/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala#L38.
 
   The new test cases would execute a query and assert that the expected events 
were emitted.



##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecutePlanHolderSuite.scala:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import org.mockito.ArgumentMatchers._
+import org.mockito.Mockito._
+import org.scalatestplus.mockito.MockitoSugar
+
+import org.apache.spark.{SparkContext, SparkFunSuite}
+import org.apache.spark.connect.proto.ExecutePlanRequest
+import org.apache.spark.scheduler.LiveListenerBus
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connect.planner.SparkConnectPlanTest
+
+class ExecutePlanHolderSuite extends SparkFunSuite with MockitoSugar with 
SparkConnectPlanTest {
+  val DEFAULT_USER_ID = "1"
+  val DEFAULT_USER_NAME = "userName"
+  val DEFAULT_SESSION_ID = "2"
+  val DEFAULT_QUERY_ID = "3"
+  val DEFAULT_CLIENT_TYPE = "clientType"
+  val DEFAULT_JOB_GROUP_ID =
+    
s"User_${DEFAULT_USER_ID}_Session_${DEFAULT_SESSION_ID}_Request_${DEFAULT_QUERY_ID}"
+
+  test("job group id matches pattern") {
+    val mockSession = mock[SparkSession]
+    val request = ExecutePlanRequest.newBuilder().build()
+    val sessionHolder = SessionHolder(DEFAULT_USER_ID, DEFAULT_SESSION_ID, 
mockSession)
+    val executePlanHolder = ExecutePlanHolder(DEFAULT_QUERY_ID, sessionHolder, 
request)
+    assert(
+      ExecutePlanHolder
+        .getQueryOperationId(executePlanHolder.jobGroupId) == 
Some(DEFAULT_QUERY_ID))
+  }
+
+  test("interrupt cancels job group id") {

Review Comment:
   same comment as above but to a lesser extent. 
   Would be better if a long running query is actually interrupted and 
validated that an event is received.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/Events.scala:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import com.fasterxml.jackson.annotation.JsonIgnore
+import com.google.protobuf.Message
+
+import org.apache.spark.SparkContext
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.ExecutePlanRequest
+import org.apache.spark.scheduler.SparkListenerEvent
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
+import org.apache.spark.sql.connect.common.ProtoUtils
+import org.apache.spark.util.{Clock, Utils}
+
+object Events {
+  // TODO: Make this configurable
+  val MAX_STATEMENT_TEXT_SIZE = 65535
+}
+
+case class Events(sessionHolder: SessionHolder, clock: Clock) {

Review Comment:
   add docs across the board.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -121,31 +121,49 @@ class SparkConnectService(debug: Boolean)
       observer: StreamObserver[V],
       userId: String,
       sessionId: String): PartialFunction[Throwable, Unit] = {
-    val session =
-      SparkConnectService
-        .getOrCreateIsolatedSession(userId, sessionId)
-        .session
+    val sessionHolder = SparkConnectService
+      .getOrCreateIsolatedSession(userId, sessionId)
+    val session = sessionHolder.session
     val stackTraceEnabled = session.conf.get(PYSPARK_JVM_STACKTRACE_ENABLED)
 
     {
+      case e: UnsupportedOperationException =>
+        logError(s"Error during: $opType. UserId: $userId. SessionId: 
$sessionId.", e)
+        val throwable = 
Status.INVALID_ARGUMENT.withDescription(e.getMessage).asRuntimeException()
+        postFailed(opType, sessionHolder.events, throwable)

Review Comment:
   Would be nice to call this once at the end of the block rather than for each 
case statement. I don't know enough Scala to know if that's possible.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecutePlanHolder.scala:
##########
@@ -19,6 +19,18 @@ package org.apache.spark.sql.connect.service
 
 import org.apache.spark.connect.proto
 
+object ExecutePlanHolder {
+  val JOB_GROUP_ID_PATTERN = 
"User_(\\d*)_Session_([a-z0-9\\-]*)_Request_([a-z0-9\\-]*)".r
+  def getQueryOperationId(jobGroupId: String): Option[String] = {

Review Comment:
   Can this be an instance method?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -121,31 +121,49 @@ class SparkConnectService(debug: Boolean)
       observer: StreamObserver[V],
       userId: String,
       sessionId: String): PartialFunction[Throwable, Unit] = {
-    val session =
-      SparkConnectService
-        .getOrCreateIsolatedSession(userId, sessionId)
-        .session
+    val sessionHolder = SparkConnectService
+      .getOrCreateIsolatedSession(userId, sessionId)
+    val session = sessionHolder.session
     val stackTraceEnabled = session.conf.get(PYSPARK_JVM_STACKTRACE_ENABLED)
 
     {
+      case e: UnsupportedOperationException =>
+        logError(s"Error during: $opType. UserId: $userId. SessionId: 
$sessionId.", e)
+        val throwable = 
Status.INVALID_ARGUMENT.withDescription(e.getMessage).asRuntimeException()
+        postFailed(opType, sessionHolder.events, throwable)
+        observer.onError(throwable)
+
       case se: SparkException if isPythonExecutionException(se) =>
         logError(s"Error during: $opType. UserId: $userId. SessionId: 
$sessionId.", se)
-        observer.onError(
-          StatusProto.toStatusRuntimeException(
-            buildStatusFromThrowable(se.getCause, stackTraceEnabled)))
+        val throwable = StatusProto.toStatusRuntimeException(
+          buildStatusFromThrowable(se.getCause, stackTraceEnabled))
+        postFailed(opType, sessionHolder.events, throwable)
+        observer.onError(throwable)
 
       case e: Throwable if e.isInstanceOf[SparkThrowable] || NonFatal.apply(e) 
=>
         logError(s"Error during: $opType. UserId: $userId. SessionId: 
$sessionId.", e)
-        observer.onError(
-          StatusProto.toStatusRuntimeException(buildStatusFromThrowable(e, 
stackTraceEnabled)))
+        val throwable =
+          StatusProto.toStatusRuntimeException(buildStatusFromThrowable(e, 
stackTraceEnabled))
+        postFailed(opType, sessionHolder.events, throwable)
+        observer.onError(throwable)
 
       case e: Throwable =>
         logError(s"Error during: $opType. UserId: $userId. SessionId: 
$sessionId.", e)
-        observer.onError(
-          Status.UNKNOWN
-            .withCause(e)
-            .withDescription(StringUtils.abbreviate(e.getMessage, 2048))
-            .asRuntimeException())
+        val throwable = Status.UNKNOWN
+          .withCause(e)
+          .withDescription(StringUtils.abbreviate(e.getMessage, 2048))
+          .asRuntimeException()
+        postFailed(opType, sessionHolder.events, throwable)
+        observer.onError(throwable)
+    }
+  }
+
+  /**
+   * Post failed for opType execute as these are the only statements being 
tracked
+   */
+  private def postFailed(opType: String, events: Events, throwable: Throwable) 
= {
+    if (opType.equals("execute")) {
+      events.postFailed(throwable.getMessage)
     }
   }

Review Comment:
   Feels like the wrong place to do this.
   
   If we're capturing only ExecutePlan events, this the call to `postFailed()` 
should be within the "catch" block of `executePlan()` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to