pan3793 commented on code in PR #4984:
URL: https://github.com/apache/kyuubi/pull/4984#discussion_r1241676393


##########
kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala:
##########
@@ -20,94 +20,148 @@ package org.apache.kyuubi.engine
 import java.util.Locale
 import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 
+import scala.collection.JavaConverters._
+
 import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification}
 import io.fabric8.kubernetes.api.model.Pod
 import io.fabric8.kubernetes.client.KubernetesClient
 import io.fabric8.kubernetes.client.informers.{ResourceEventHandler, 
SharedIndexInformer}
 
-import org.apache.kyuubi.{Logging, Utils}
+import org.apache.kyuubi.{KyuubiException, Logging, Utils}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.engine.ApplicationState.{isTerminated, 
ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
 import 
org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState, 
LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL}
 import org.apache.kyuubi.util.KubernetesUtils
 
 class KubernetesApplicationOperation extends ApplicationOperation with Logging 
{
 
-  @volatile
-  private var kubernetesClient: KubernetesClient = _
-  private var enginePodInformer: SharedIndexInformer[Pod] = _
+  private val kubernetesClients: ConcurrentHashMap[KubernetesInfo, 
KubernetesClient] =
+    new ConcurrentHashMap[KubernetesInfo, KubernetesClient]
+  private val enginePodInformers: ConcurrentHashMap[KubernetesInfo, 
SharedIndexInformer[Pod]] =
+    new ConcurrentHashMap[KubernetesInfo, SharedIndexInformer[Pod]]
+
+  private var supportedContexts: Seq[String] = Seq.empty
+  private var supportedNamespaces: Seq[String] = Seq.empty
+
   private var submitTimeout: Long = _
+  private var kyuubiConf: KyuubiConf = _
 
   // key is kyuubi_unique_key
   private val appInfoStore: ConcurrentHashMap[String, ApplicationInfo] =
     new ConcurrentHashMap[String, ApplicationInfo]
   // key is kyuubi_unique_key
   private var cleanupTerminatedAppInfoTrigger: Cache[String, ApplicationState] 
= _
 
-  override def initialize(conf: KyuubiConf): Unit = {
-    info("Start initializing Kubernetes Client.")
-    kubernetesClient = KubernetesUtils.buildKubernetesClient(conf) match {
+  private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): 
KubernetesClient = {
+    val context = kubernetesInfo.context
+    val namespace = kubernetesInfo.namespace
+
+    if (supportedContexts.nonEmpty && !supportedContexts.contains(context)) {
+      throw new KyuubiException(
+        s"Kubernetes context $context is not in the support 
list[$supportedContexts]")
+    }
+
+    if (supportedNamespaces.nonEmpty && 
!supportedNamespaces.contains(namespace)) {
+      throw new KyuubiException(
+        s"Kubernetes namespace $namespace is not in the support 
list[$supportedNamespaces]")
+    }
+
+    var kubernetesClient = kubernetesClients.get(kubernetesInfo)
+    if (kubernetesClient == null) {

Review Comment:
   computeIfAbsent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to