Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19383#discussion_r146694573
  
    --- Diff: 
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---
    @@ -0,0 +1,690 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.status
    +
    +import java.io.File
    +import java.util.{Date, Properties}
    +
    +import scala.collection.JavaConverters._
    +import scala.reflect.{classTag, ClassTag}
    +
    +import org.scalatest.BeforeAndAfter
    +
    +import org.apache.spark._
    +import org.apache.spark.executor.TaskMetrics
    +import org.apache.spark.scheduler._
    +import org.apache.spark.scheduler.cluster._
    +import org.apache.spark.status.api.v1
    +import org.apache.spark.storage._
    +import org.apache.spark.util.Utils
    +import org.apache.spark.util.kvstore._
    +
    +class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
    +
    +  private var time: Long = _
    +  private var testDir: File = _
    +  private var store: KVStore = _
    +
    +  before {
    +    time = 0L
    +    testDir = Utils.createTempDir()
    +    store = KVUtils.open(testDir, getClass().getName())
    +  }
    +
    +  after {
    +    store.close()
    +    Utils.deleteRecursively(testDir)
    +  }
    +
    +  test("scheduler events") {
    +    val listener = new AppStatusListener(store)
    +
    +    // Start the application.
    +    time += 1
    +    listener.onApplicationStart(SparkListenerApplicationStart(
    +      "name",
    +      Some("id"),
    +      time,
    +      "user",
    +      Some("attempt"),
    +      None))
    +
    +    check[ApplicationInfoWrapper]("id") { app =>
    +      assert(app.info.name === "name")
    +      assert(app.info.id === "id")
    +      assert(app.info.attempts.size === 1)
    +
    +      val attempt = app.info.attempts.head
    +      assert(attempt.attemptId === Some("attempt"))
    +      assert(attempt.startTime === new Date(time))
    +      assert(attempt.lastUpdated === new Date(time))
    +      assert(attempt.endTime.getTime() === -1L)
    +      assert(attempt.sparkUser === "user")
    +      assert(!attempt.completed)
    +    }
    +
    +    // Start a couple of executors.
    +    time += 1
    +    val execIds = Array("1", "2")
    +
    +    execIds.foreach { id =>
    +      listener.onExecutorAdded(SparkListenerExecutorAdded(time, id,
    +        new ExecutorInfo(s"$id.example.com", 1, Map())))
    +    }
    +
    +    execIds.foreach { id =>
    +      check[ExecutorSummaryWrapper](id) { exec =>
    +        assert(exec.info.id === id)
    +        assert(exec.info.hostPort === s"$id.example.com")
    +        assert(exec.info.isActive)
    +      }
    +    }
    +
    +    // Start a job with 2 stages / 4 tasks each
    +    time += 1
    +    val stages = Seq(
    +      new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1"),
    +      new StageInfo(2, 0, "stage2", 4, Nil, Seq(1), "details2"))
    +
    +    val jobProps = new Properties()
    +    jobProps.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "jobGroup")
    +    jobProps.setProperty("spark.scheduler.pool", "schedPool")
    +
    +    listener.onJobStart(SparkListenerJobStart(1, time, stages, jobProps))
    +
    +    check[JobDataWrapper](1) { job =>
    +      assert(job.info.jobId === 1)
    +      assert(job.info.name === stages.last.name)
    +      assert(job.info.description === None)
    +      assert(job.info.status === JobExecutionStatus.RUNNING)
    +      assert(job.info.submissionTime === Some(new Date(time)))
    +      assert(job.info.jobGroup === Some("jobGroup"))
    +    }
    +
    +    stages.foreach { info =>
    +      check[StageDataWrapper](key(info)) { stage =>
    +        assert(stage.info.status === v1.StageStatus.PENDING)
    +        assert(stage.info.schedulingPool === "schedPool")
    +        assert(stage.jobIds === Set(1))
    +      }
    +    }
    +
    +    // Submit stage 1
    +    time += 1
    +    stages.head.submissionTime = Some(time)
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(stages.head, 
jobProps))
    +
    +    check[JobDataWrapper](1) { job =>
    +      assert(job.info.numActiveStages === 1)
    +    }
    +
    +    check[StageDataWrapper](key(stages.head)) { stage =>
    +      assert(stage.info.status === v1.StageStatus.ACTIVE)
    +      assert(stage.info.submissionTime === Some(new 
Date(stages.head.submissionTime.get)))
    +    }
    +
    +    // Start tasks from stage 1
    +    time += 1
    +    var _taskIdTracker = -1L
    +    def nextTaskId(): Long = {
    +      _taskIdTracker += 1
    +      _taskIdTracker
    +    }
    +
    +    def createTasks(count: Int, time: Long): Seq[TaskInfo] = {
    +      (1 to count).map { id =>
    +        val exec = execIds(id.toInt % execIds.length)
    +        val taskId = nextTaskId()
    +        new TaskInfo(taskId, taskId.toInt, 1, time, exec, 
s"$exec.example.com",
    +          TaskLocality.PROCESS_LOCAL, id % 2 == 0)
    +      }
    +    }
    +
    +    val s1Tasks = createTasks(4, time)
    +    s1Tasks.foreach { task =>
    +      listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, 
stages.head.attemptId, task))
    +    }
    +
    +    assert(store.count(classOf[TaskDataWrapper]) === s1Tasks.size)
    +
    +    check[JobDataWrapper](1) { job =>
    +      assert(job.info.numActiveTasks === s1Tasks.size)
    +    }
    +
    +    check[StageDataWrapper](key(stages.head)) { stage =>
    +      assert(stage.info.numActiveTasks === s1Tasks.size)
    +      assert(stage.info.firstTaskLaunchedTime === Some(new 
Date(s1Tasks.head.launchTime)))
    +    }
    +
    +    s1Tasks.foreach { task =>
    +      check[TaskDataWrapper](task.taskId) { wrapper =>
    +        assert(wrapper.info.taskId === task.taskId)
    +        assert(wrapper.info.index === task.index)
    +        assert(wrapper.info.attempt === task.attemptNumber)
    +        assert(wrapper.info.launchTime === new Date(task.launchTime))
    +        assert(wrapper.info.executorId === task.executorId)
    +        assert(wrapper.info.host === task.host)
    +        assert(wrapper.info.status === task.status)
    +        assert(wrapper.info.taskLocality === task.taskLocality.toString())
    +        assert(wrapper.info.speculative === task.speculative)
    +      }
    +    }
    +
    +    // Send executor metrics update. Only update one metric to avoid a lot 
of boilerplate code.
    +    s1Tasks.foreach { task =>
    +      val accum = new AccumulableInfo(1L, 
Some(InternalAccumulator.MEMORY_BYTES_SPILLED),
    +        Some(1L), None, true, false, None)
    +      listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(
    +        task.executorId,
    +        Seq((task.taskId, stages.head.stageId, stages.head.attemptId, 
Seq(accum)))))
    +    }
    +
    +    check[StageDataWrapper](key(stages.head)) { stage =>
    +      assert(stage.info.memoryBytesSpilled === s1Tasks.size)
    +
    +      val execs = 
store.view(classOf[ExecutorStageSummaryWrapper]).index("stage")
    +        .first(key(stages.head)).last(key(stages.head)).asScala.toSeq
    +      assert(execs.size > 0)
    +      execs.foreach { exec =>
    +        assert(exec.info.memoryBytesSpilled === s1Tasks.size / 2)
    +      }
    --- End diff --
    
    nit -- this section on execs doesn't belong inside 
`check[StageDataWrapper](key(stages.head))`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to