cxzl25 commented on code in PR #5410:
URL: https://github.com/apache/kyuubi/pull/5410#discussion_r1361813258
##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:
##########
@@ -79,9 +82,10 @@ class SQLOperationListener(
}
}
- override def onJobStart(jobStart: SparkListenerJobStart): Unit =
activeJobs.synchronized {
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
if (sameGroupId(jobStart.properties)) {
val jobId = jobStart.jobId
+ val stageIds = jobStart.stageInfos.map(_.stageId)
Review Comment:
```suggestion
val stageIds = jobStart.stageInfos.map(_.stageId).toSet
```
##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala:
##########
@@ -134,9 +140,15 @@ class SQLOperationListener(
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted):
Unit = {
val stageInfo = stageCompleted.stageInfo
+ val stageId = stageInfo.stageId
val stageAttempt = SparkStageAttempt(stageInfo.stageId,
stageInfo.attemptNumber())
activeStages.synchronized {
if (activeStages.remove(stageAttempt) != null) {
+ activeJobs.asScala.foreach(item => {
+ if (item._2.stageIds.contains(stageId)) {
+ item._2.numCompleteStages.getAndIncrement()
Review Comment:
Add one when the stage is successful.
https://github.com/apache/spark/blob/28961a6ce001e0c25c780a39a726fdd825542cee/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala#L827-L836
##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala:
##########
@@ -27,3 +27,7 @@ class SparkStageInfo(val stageId: Int, val numTasks: Int) {
var numActiveTasks = new AtomicInteger(0)
var numCompleteTasks = new AtomicInteger(0)
}
+
+class SparkJobInfo(val numStages: Int, val stageIds: Seq[Int]) {
+ var numCompleteStages = new AtomicInteger(0)
Review Comment:
```suggestion
class SparkJobInfo(val numStages: Int, val stageIds: Set[Int]) {
val numCompleteStages = new AtomicInteger(0)
```
##########
externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala:
##########
@@ -27,3 +27,7 @@ class SparkStageInfo(val stageId: Int, val numTasks: Int) {
var numActiveTasks = new AtomicInteger(0)
var numCompleteTasks = new AtomicInteger(0)
Review Comment:
```suggestion
val numActiveTasks = new AtomicInteger(0)
val numCompleteTasks = new AtomicInteger(0)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]