Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/19383#discussion_r141972641
--- Diff:
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---
@@ -0,0 +1,688 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.io.File
+import java.util.{Date, Properties}
+
+import scala.collection.JavaConverters._
+import scala.reflect.{classTag, ClassTag}
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark._
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster._
+import org.apache.spark.status.api.v1
+import org.apache.spark.storage._
+import org.apache.spark.util.Utils
+import org.apache.spark.util.kvstore._
+
+class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
+
+ private var time: Long = _
+ private var testDir: File = _
+ private var store: KVStore = _
+
+ before {
+ time = 0L
+ testDir = Utils.createTempDir()
+ store = KVUtils.open(testDir, getClass().getName())
+ }
+
+ after {
+ store.close()
+ Utils.deleteRecursively(testDir)
+ }
+
+ test("scheduler events") {
+ val listener = new AppStatusListener(store)
+
+ // Start the application.
+ time += 1
+ listener.onApplicationStart(SparkListenerApplicationStart(
+ "name",
+ Some("id"),
+ time,
+ "user",
+ Some("attempt"),
+ None))
+
+ check[ApplicationInfoWrapper]("id") { app =>
+ assert(app.info.name === "name")
+ assert(app.info.id === "id")
+ assert(app.info.attempts.size === 1)
+
+ val attempt = app.info.attempts.head
+ assert(attempt.attemptId === Some("attempt"))
+ assert(attempt.startTime === new Date(time))
+ assert(attempt.lastUpdated === new Date(time))
+ assert(attempt.endTime.getTime() === -1L)
+ assert(attempt.sparkUser === "user")
+ assert(!attempt.completed)
+ }
+
+ // Start a couple of executors.
+ time += 1
+ val execIds = Array("1", "2")
+
+ execIds.foreach { id =>
+ listener.onExecutorAdded(SparkListenerExecutorAdded(time, id,
+ new ExecutorInfo(s"$id.example.com", 1, Map())))
+ }
+
+ execIds.foreach { id =>
+ check[ExecutorSummaryWrapper](id) { exec =>
+ assert(exec.info.id === id)
+ assert(exec.info.hostPort === s"$id.example.com")
+ assert(exec.info.isActive)
+ }
+ }
+
+ // Start a job with 2 stages / 4 tasks each
+ time += 1
+ val stages = Seq(
+ new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1"),
+ new StageInfo(2, 0, "stage2", 4, Nil, Seq(1), "details2"))
+
+ val stageProps = new Properties()
--- End diff --
these properties should also get passed to `onJobStart`, and there should
be an assert on `job.info.jobGroup`
(probably it should also have the scheduler pool, but thats missing in the
current code, so can keep it separate ...)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]