pan3793 commented on code in PR #4984:
URL: https://github.com/apache/kyuubi/pull/4984#discussion_r1241686692
##########
kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala:
##########
@@ -20,94 +20,148 @@ package org.apache.kyuubi.engine
import java.util.Locale
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import scala.collection.JavaConverters._
+
import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification}
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.informers.{ResourceEventHandler,
SharedIndexInformer}
-import org.apache.kyuubi.{Logging, Utils}
+import org.apache.kyuubi.{KyuubiException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.ApplicationState.{isTerminated,
ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
import
org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState,
LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL}
import org.apache.kyuubi.util.KubernetesUtils
class KubernetesApplicationOperation extends ApplicationOperation with Logging
{
- @volatile
- private var kubernetesClient: KubernetesClient = _
- private var enginePodInformer: SharedIndexInformer[Pod] = _
+ private val kubernetesClients: ConcurrentHashMap[KubernetesInfo,
KubernetesClient] =
+ new ConcurrentHashMap[KubernetesInfo, KubernetesClient]
+ private val enginePodInformers: ConcurrentHashMap[KubernetesInfo,
SharedIndexInformer[Pod]] =
+ new ConcurrentHashMap[KubernetesInfo, SharedIndexInformer[Pod]]
+
+ private var supportedContexts: Seq[String] = Seq.empty
+ private var supportedNamespaces: Seq[String] = Seq.empty
+
private var submitTimeout: Long = _
+ private var kyuubiConf: KyuubiConf = _
// key is kyuubi_unique_key
private val appInfoStore: ConcurrentHashMap[String, ApplicationInfo] =
new ConcurrentHashMap[String, ApplicationInfo]
// key is kyuubi_unique_key
private var cleanupTerminatedAppInfoTrigger: Cache[String, ApplicationState]
= _
- override def initialize(conf: KyuubiConf): Unit = {
- info("Start initializing Kubernetes Client.")
- kubernetesClient = KubernetesUtils.buildKubernetesClient(conf) match {
+ private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo):
KubernetesClient = {
+ val context = kubernetesInfo.context
+ val namespace = kubernetesInfo.namespace
+
+ if (supportedContexts.nonEmpty && !supportedContexts.contains(context)) {
+ throw new KyuubiException(
+ s"Kubernetes context $context is not in the support
list[$supportedContexts]")
+ }
+
+ if (supportedNamespaces.nonEmpty &&
!supportedNamespaces.contains(namespace)) {
+ throw new KyuubiException(
+ s"Kubernetes namespace $namespace is not in the support
list[$supportedNamespaces]")
+ }
+
+ var kubernetesClient = kubernetesClients.get(kubernetesInfo)
+ if (kubernetesClient == null) {
+ synchronized {
+ kubernetesClient = kubernetesClients.get(kubernetesInfo)
+ if (kubernetesClient == null) {
+ kubernetesClients.put(kubernetesInfo,
buildKubernetesClient(kubernetesInfo))
+ }
+ }
+ }
+ kubernetesClient
+ }
+
+ private def buildKubernetesClient(kubernetesInfo: KubernetesInfo):
KubernetesClient = {
+ val kubernetesConf =
+ kyuubiConf.getKubernetesConf(kubernetesInfo.context,
kubernetesInfo.namespace)
+ KubernetesUtils.buildKubernetesClient(kubernetesConf) match {
case Some(client) =>
- info(s"Initialized Kubernetes Client connect to:
${client.getMasterUrl}")
- submitTimeout = conf.get(KyuubiConf.ENGINE_KUBERNETES_SUBMIT_TIMEOUT)
- // Disable resync, see
https://github.com/fabric8io/kubernetes-client/discussions/5015
- enginePodInformer = client.pods()
+ info(s"[$kubernetesInfo] Initialized Kubernetes Client connect to:
${client.getMasterUrl}")
+ val enginePodInformer = client.pods()
.withLabel(LABEL_KYUUBI_UNIQUE_KEY)
.inform(new SparkEnginePodEventHandler)
- info("Start Kubernetes Client Informer.")
- // Defer cleaning terminated application information
- val retainPeriod =
conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD)
- cleanupTerminatedAppInfoTrigger = CacheBuilder.newBuilder()
- .expireAfterWrite(retainPeriod, TimeUnit.MILLISECONDS)
- .removalListener((notification: RemovalNotification[String,
ApplicationState]) => {
- Option(appInfoStore.remove(notification.getKey)).foreach { removed
=>
- info(s"Remove terminated application ${removed.id} with " +
- s"tag ${notification.getKey} and state ${removed.state}")
- }
- })
- .build()
+ info(s"[$kubernetesInfo] Start Kubernetes Client Informer.")
+ enginePodInformers.putIfAbsent(kubernetesInfo, enginePodInformer)
Review Comment:
in which case non-absent will happen?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]