turboFei commented on code in PR #2575:
URL: https://github.com/apache/incubator-kyuubi/pull/2575#discussion_r866776684
##########
kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala:
##########
@@ -60,3 +71,132 @@ trait WithKyuubiServerOnYarn extends WithKyuubiServer {
}
}
}
+
+class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with
HiveJDBCTestHelper {
+
+ private val preDefinedAppName = "kyuubi-batch-job"
+
+ override protected val conf: KyuubiConf = {
+ new KyuubiConf().set(s"$KYUUBI_BATCH_CONF_PREFIX.spark.spark.app.name",
preDefinedAppName)
+ .set(BATCH_CONF_IGNORE_LIST, Seq("spark.app.name"))
+ }
+
+ override protected def jdbcUrl: String = getJdbcUrl
+
+ test("KYUUBI #527- Support test with mini yarn cluster") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("""SELECT "${spark.app.id}" as
id""")
+ assert(resultSet.next())
+ assert(resultSet.getString("id").startsWith("application_"))
+ }
+ }
+
+ test("session_user shall work on yarn") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("SELECT SESSION_USER() as su")
+ assert(resultSet.next())
+ assert(resultSet.getString("su") === user)
+ }
+ }
+
+ private def sessionManager: KyuubiSessionManager =
+ server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
+
+ test("open batch session") {
+ val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", conf)
+
+ val batchRequest = BatchRequest(
+ "spark",
+ sparkProcessBuilder.mainResource.get,
+ sparkProcessBuilder.mainClass,
+ null,
+ Map(
+ "spark.master" -> "yarn",
+ "spark.app.name" -> "customName",
+ s"spark.${ENGINE_SPARK_MAX_LIFETIME.key}" -> "5000",
+ s"spark.${ENGINE_CHECK_INTERVAL.key}" -> "1000"),
+ Seq.empty[String])
+
+ val sessionHandle = sessionManager.openBatchSession(
+ TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
+ "kyuubi",
+ "passwd",
+ "localhost",
+ batchRequest.conf,
+ batchRequest)
+
+ assert(sessionHandle.identifier.secretId ===
KyuubiSessionManager.STATIC_BATCH_SECRET_UUID)
+ val session =
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
+ val batchJobSubmissionOp = session.batchJobSubmissionOp
+
+ eventually(timeout(3.minutes), interval(50.milliseconds)) {
+ val state = batchJobSubmissionOp.currentApplicationState
+ assert(state.nonEmpty)
+ assert(state.exists(_("id").startsWith("application_")))
+ assert(state.exists(_("name") == preDefinedAppName))
+ }
+
+ val killResponse =
yarnOperation.killApplicationByTag(sessionHandle.identifier.toString)
+ assert(killResponse._1)
+ assert(killResponse._2 startsWith "Succeeded to terminate:")
+
+ val appInfo =
yarnOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
+
+ assert(appInfo("state") === "KILLED")
+
+ eventually(timeout(3.minutes), interval(50.milliseconds)) {
+ assert(batchJobSubmissionOp.getStatus.state === ERROR)
+ }
+
+ val resultColumns =
batchJobSubmissionOp.getNextRowSet(FetchOrientation.FETCH_NEXT, 10)
+ .getColumns.asScala
+
+ val keys = resultColumns.head.getStringVal.getValues.asScala
+ val values = resultColumns.apply(1).getStringVal.getValues.asScala
+ val rows = keys.zip(values).toMap
+ val appId = rows("id")
+ val appName = rows("name")
+ val appState = rows("state")
+ val appUrl = rows("url")
+ val appError = rows("error")
+
+ val state2 = batchJobSubmissionOp.currentApplicationState.get
+ assert(appId === state2("id"))
+ assert(appName === state2("name"))
+ assert(appState === state2("state"))
+ assert(appUrl === state2("url"))
+ assert(appError === state2("error"))
+ sessionManager.closeSession(sessionHandle)
+ }
+
+ test("prevent dead loop if the batch job submission process it not alive") {
+ val sparkProcessBuilder = new SparkProcessBuilder("kyuubi", conf)
+
+ val batchRequest = BatchRequest(
+ "spark",
+ sparkProcessBuilder.mainResource.get,
+ sparkProcessBuilder.mainClass,
+ "spark-batch-submission",
+ Map(
+ "spark.master" -> "invalid",
+ s"spark.${ENGINE_SPARK_MAX_LIFETIME.key}" -> "5000",
+ s"spark.${ENGINE_CHECK_INTERVAL.key}" -> "1000"),
+ Seq.empty[String])
+
+ val sessionHandle = sessionManager.openBatchSession(
+ TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
+ "kyuubi",
+ "passwd",
+ "localhost",
+ batchRequest.conf,
+ batchRequest)
+
+ val session =
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
+ val batchJobSubmissionOp = session.batchJobSubmissionOp
+
+ eventually(timeout(3.minutes), interval(50.milliseconds)) {
+ assert(batchJobSubmissionOp.currentApplicationState.isEmpty)
+ assert(batchJobSubmissionOp.getStatus.state === OperationState.ERROR)
+ }
+ }
+}
Review Comment:
nit: new line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]