Github user rvesse commented on a diff in the pull request:
https://github.com/apache/spark/pull/22959#discussion_r231838596
--- Diff:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
---
@@ -112,125 +72,139 @@ private[spark] case class KubernetesConf[T <:
KubernetesRoleSpecificConf](
def getOption(key: String): Option[String] = sparkConf.getOption(key)
}
+private[spark] class KubernetesDriverConf(
+ sparkConf: SparkConf,
+ val appId: String,
+ val mainAppResource: MainAppResource,
+ val mainClass: String,
+ val appArgs: Array[String],
+ val pyFiles: Seq[String])
+ extends KubernetesConf(sparkConf) {
+
+ override val resourceNamePrefix: String = {
+ val custom = if (Utils.isTesting)
get(KUBERNETES_DRIVER_POD_NAME_PREFIX) else None
+ custom.getOrElse(KubernetesConf.getResourceNamePrefix(appName))
+ }
+
+ override def labels: Map[String, String] = {
+ val presetLabels = Map(
+ SPARK_APP_ID_LABEL -> appId,
+ SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
+ val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
+ sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX)
+
+ presetLabels.keys.foreach { key =>
+ require(
+ !driverCustomLabels.contains(key),
+ s"Label with key $key is not allowed as it is reserved for Spark
bookkeeping operations.")
+ }
+
+ driverCustomLabels ++ presetLabels
+ }
+
+ override def environment: Map[String, String] = {
+ KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf,
KUBERNETES_DRIVER_ENV_PREFIX)
+ }
+
+ override def annotations: Map[String, String] = {
+ KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf,
KUBERNETES_DRIVER_ANNOTATION_PREFIX)
+ }
+
+ override def secretNamesToMountPaths: Map[String, String] = {
+ KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf,
KUBERNETES_DRIVER_SECRETS_PREFIX)
+ }
+
+ override def secretEnvNamesToKeyRefs: Map[String, String] = {
+ KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf,
KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX)
+ }
+
+ override def volumes: Seq[KubernetesVolumeSpec] = {
+ KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf,
KUBERNETES_DRIVER_VOLUMES_PREFIX)
+ }
+}
+
+private[spark] class KubernetesExecutorConf(
+ sparkConf: SparkConf,
+ val appId: String,
+ val executorId: String,
+ val driverPod: Option[Pod])
+ extends KubernetesConf(sparkConf) {
+
+ override val resourceNamePrefix: String = {
+ get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX).getOrElse(
+ KubernetesConf.getResourceNamePrefix(appName))
+ }
+
+ override def labels: Map[String, String] = {
+ val presetLabels = Map(
+ SPARK_EXECUTOR_ID_LABEL -> executorId,
+ SPARK_APP_ID_LABEL -> appId,
+ SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE)
+
+ val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
+ sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX)
+
+ presetLabels.keys.foreach { key =>
+ require(
+ !executorCustomLabels.contains(key),
+ s"Custom executor labels cannot contain $key as it is reserved for
Spark.")
+ }
+
+ executorCustomLabels ++ presetLabels
+ }
+
+ override def environment: Map[String, String] =
sparkConf.getExecutorEnv.toMap
+
+ override def annotations: Map[String, String] = {
+ KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf,
KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
+ }
+
+ override def secretNamesToMountPaths: Map[String, String] = {
+ KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf,
KUBERNETES_EXECUTOR_SECRETS_PREFIX)
+ }
+
+ override def secretEnvNamesToKeyRefs: Map[String, String] = {
+ KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf,
KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX)
+ }
+
+ override def volumes: Seq[KubernetesVolumeSpec] = {
+ KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf,
KUBERNETES_EXECUTOR_VOLUMES_PREFIX)
+ }
+
+}
+
private[spark] object KubernetesConf {
def createDriverConf(
sparkConf: SparkConf,
- appName: String,
- appResourceNamePrefix: String,
appId: String,
mainAppResource: MainAppResource,
mainClass: String,
appArgs: Array[String],
- maybePyFiles: Option[String],
- hadoopConfDir: Option[String]):
KubernetesConf[KubernetesDriverSpecificConf] = {
- val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
- sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX)
- require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with
key " +
- s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark
bookkeeping " +
- "operations.")
- require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with
key " +
- s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark
bookkeeping " +
- "operations.")
- val driverLabels = driverCustomLabels ++ Map(
- SPARK_APP_ID_LABEL -> appId,
- SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
- val driverAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
- sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX)
- val driverSecretNamesToMountPaths =
KubernetesUtils.parsePrefixedKeyValuePairs(
- sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX)
- val driverSecretEnvNamesToKeyRefs =
KubernetesUtils.parsePrefixedKeyValuePairs(
- sparkConf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX)
- val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
- sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)
- val driverVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
- sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX).map(_.get)
- // Also parse executor volumes in order to verify configuration
- // before the driver pod is created
- KubernetesVolumeUtils.parseVolumesWithPrefix(
- sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get)
-
- val hadoopConfigMapName =
sparkConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP)
- KubernetesUtils.requireNandDefined(
- hadoopConfDir,
- hadoopConfigMapName,
- "Do not specify both the `HADOOP_CONF_DIR` in your ENV and the
ConfigMap " +
- "as the creation of an additional ConfigMap, when one is already
specified is extraneous" )
- val hadoopConfSpec =
- if (hadoopConfDir.isDefined || hadoopConfigMapName.isDefined) {
- Some(HadoopConfSpec(hadoopConfDir, hadoopConfigMapName))
- } else {
- None
- }
- val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil)
+ maybePyFiles: Option[String]): KubernetesDriverConf = {
+ // Parse executor volumes in order to verify configuration before the
driver pod is created.
+ KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf,
KUBERNETES_EXECUTOR_VOLUMES_PREFIX)
-
- KubernetesConf(
- sparkConf.clone(),
- KubernetesDriverSpecificConf(mainAppResource, mainClass, appName,
appArgs, pyFiles),
- appResourceNamePrefix,
- appId,
- driverLabels,
- driverAnnotations,
- driverSecretNamesToMountPaths,
- driverSecretEnvNamesToKeyRefs,
- driverEnvs,
- driverVolumes,
- hadoopConfSpec)
+ val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil)
+ new KubernetesDriverConf(sparkConf.clone(), appId, mainAppResource,
mainClass, appArgs,
+ pyFiles)
}
def createExecutorConf(
sparkConf: SparkConf,
executorId: String,
appId: String,
- driverPod: Option[Pod]):
KubernetesConf[KubernetesExecutorSpecificConf] = {
- val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
- sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX)
- require(
- !executorCustomLabels.contains(SPARK_APP_ID_LABEL),
- s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is
reserved for Spark.")
- require(
- !executorCustomLabels.contains(SPARK_EXECUTOR_ID_LABEL),
- s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as
it is reserved for" +
- " Spark.")
- require(
- !executorCustomLabels.contains(SPARK_ROLE_LABEL),
- s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is
reserved for Spark.")
- val executorLabels = Map(
- SPARK_EXECUTOR_ID_LABEL -> executorId,
- SPARK_APP_ID_LABEL -> appId,
- SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++
- executorCustomLabels
- val executorAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
- sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
- val executorMountSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
- sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
- val executorEnvSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
- sparkConf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX)
- val executorEnv = sparkConf.getExecutorEnv.toMap
- val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
- sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get)
-
- // If no prefix is defined then we are in pure client mode
- // (not the one used by cluster mode inside the container)
- val appResourceNamePrefix = {
- if
(sparkConf.getOption(KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key).isEmpty) {
- getResourceNamePrefix(getAppName(sparkConf))
- } else {
- sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
- }
- }
+ driverPod: Option[Pod]): KubernetesExecutorConf = {
+ new KubernetesExecutorConf(sparkConf.clone(), appId, executorId,
driverPod)
+ }
- KubernetesConf(
- sparkConf.clone(),
- KubernetesExecutorSpecificConf(executorId, driverPod),
- appResourceNamePrefix,
- appId,
- executorLabels,
- executorAnnotations,
- executorMountSecrets,
- executorEnvSecrets,
- executorEnv,
- executorVolumes,
- None)
+ def getResourceNamePrefix(appName: String): String = {
--- End diff --
Looks like an identical method appears in
[KubernetesClientApplication](https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala#L263-L272)
- can we reuse or move this to `KubernetesUtils`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]