yaooqinn commented on code in PR #2687:
URL: https://github.com/apache/incubator-kyuubi/pull/2687#discussion_r876655204
##########
kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefSuite.scala:
##########
@@ -247,4 +256,84 @@ class EngineRefSuite extends KyuubiFunSuite {
assert(port2 == port1, "engine shared")
}
}
+
+ test("different engine type should use its own lock") {
+ conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
+ conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+ conf.set(KyuubiConf.ENGINE_INIT_TIMEOUT, 3000L)
+ conf.set(HighAvailabilityConf.HA_ZK_NAMESPACE, "engine_test1")
+ conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkServer.getConnectString)
+ val conf1 = conf.clone
+ conf1.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
+ val conf2 = conf.clone
+ conf2.set(KyuubiConf.ENGINE_TYPE, HIVE_SQL.toString)
+
+ val start = System.currentTimeMillis()
+ val times = new Array[Long](2)
+ val executor = Executors.newFixedThreadPool(2)
+ executor.execute(() => {
+ DiscoveryClientProvider.withDiscoveryClient(conf1) { client =>
+ try {
+ new EngineRef(conf1, user, UUID.randomUUID().toString, null)
+ .getOrCreate(client)
+ } finally {
+ times(0) = System.currentTimeMillis()
+ }
+ }
+ })
+ executor.execute(() => {
+ DiscoveryClientProvider.withDiscoveryClient(conf2) { client =>
+ try {
+ new EngineRef(conf2, user, UUID.randomUUID().toString, null)
+ .getOrCreate(client)
+ } finally {
+ times(1) = System.currentTimeMillis()
+ }
+ }
+ })
+
+ eventually(timeout(10.seconds), interval(200.milliseconds)) {
+ assert(times.forall(_ > start))
+ // ENGINE_INIT_TIMEOUT is 3000ms
+ assert(times.max - times.min < 2500)
+ }
+ executor.shutdown()
+ }
+
+ test("three same lock request with initialization timeout") {
+ val id = UUID.randomUUID().toString
+ conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
+ conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
+ conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+ conf.set(KyuubiConf.ENGINE_INIT_TIMEOUT, 3000L)
+ conf.set(HighAvailabilityConf.HA_ZK_NAMESPACE, "engine_test2")
+ conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkServer.getConnectString)
+
+ val beforeEngines = MetricsSystem.counterValue(ENGINE_TOTAL).getOrElse(0L)
+ val start = System.currentTimeMillis()
+ val times = new Array[Long](3)
+ val executor = Executors.newFixedThreadPool(3)
+ (0 until (3)).foreach { i =>
+ val cloned = conf.clone
+ executor.execute(() => {
+ DiscoveryClientProvider.withDiscoveryClient(cloned) { client =>
+ try {
+ new EngineRef(cloned, user, id, null).getOrCreate(client)
+ } finally {
+ times(i) = System.currentTimeMillis()
+ }
+ }
+ })
+ }
+
+ eventually(timeout(20.seconds), interval(200.milliseconds)) {
+ assert(times.forall(_ > start))
+ // ENGINE_INIT_TIMEOUT is 3000ms
+ assert(times.max - times.min > 2800)
+ }
+
+ // we should only submit two engines, the last request should timeout and
fail
+ assert(MetricsSystem.counterValue(ENGINE_TOTAL).get - beforeEngines == 2)
+ executor.shutdown()
Review Comment:
shall we put it in the finally block?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]