Github user rvesse commented on a diff in the pull request: https://github.com/apache/spark/pull/22256#discussion_r213954156 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala --- @@ -37,41 +40,99 @@ private[spark] class LocalDirsFeatureStep( .orElse(conf.getOption("spark.local.dir")) .getOrElse(defaultLocalDir) .split(",") + private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS) override def configurePod(pod: SparkPod): SparkPod = { val localDirVolumes = resolvedLocalDirs .zipWithIndex .map { case (localDir, index) => - new VolumeBuilder() - .withName(s"spark-local-dir-${index + 1}") - .withNewEmptyDir() - .endEmptyDir() - .build() + val name = s"spark-local-dir-${index + 1}" + // To allow customisation of local dirs backing volumes we should avoid creating + // emptyDir volumes if the volume is already defined in the pod spec + hasVolume(pod, name) match { + case true => + // For pre-existing volume definitions just re-use the volume + pod.pod.getSpec().getVolumes().asScala.find(v => v.getName.equals(name)).get + case false => + // Create new emptyDir volume + new VolumeBuilder() + .withName(name) + .withNewEmptyDir() + .withMedium(useLocalDirTmpFs match { + case true => "Memory" // Use tmpfs + case false => null // Default - use nodes backing storage + }) + .endEmptyDir() + .build() + } } + val localDirVolumeMounts = localDirVolumes .zip(resolvedLocalDirs) .map { case (localDirVolume, localDirPath) => - new VolumeMountBuilder() - .withName(localDirVolume.getName) - .withMountPath(localDirPath) - .build() + hasVolumeMount(pod, localDirVolume.getName, localDirPath) match { + case true => + // For pre-existing volume mounts just re-use the mount + pod.container.getVolumeMounts().asScala + .find(m => m.getName.equals(localDirVolume.getName) + && m.getMountPath.equals(localDirPath)) + .get + case false => + // Create new volume mount + new VolumeMountBuilder() + .withName (localDirVolume.getName) + .withMountPath (localDirPath) + .build() + } + } + + // Check for conflicting volume mounts + for (m: VolumeMount <- localDirVolumeMounts) { + if (hasConflictingVolumeMount(pod, m.getName, m.getMountPath).size > 0) { + throw new SparkException(s"Conflicting volume mounts defined, pod template attempted to " + + "mount SPARK_LOCAL_DIRS volume ${m.getName} multiple times or at an alternative path " + + "then the expected ${m.getPath}") } + } + val podWithLocalDirVolumes = new PodBuilder(pod.pod) .editSpec() - .addToVolumes(localDirVolumes: _*) + // Don't want to re-add volumes that already existed in the incoming spec + // as duplicate definitions will lead to K8S API errors + .addToVolumes(localDirVolumes.filter(v => !hasVolume(pod, v.getName)): _*) --- End diff -- The implementation is still additive in that it will add to existing elements in the pod spec as needed but respect what is already present. If your pod spec contains duplicate volumes/volume mounts then K8S will reject it as invalid e.g. ``` The Pod "rvesse-test" is invalid: spec.volumes[1].name: Duplicate value: "spark-local-dirs-1" ``` Therefore it is necessary to explicitly avoid duplicating things already present in the template If the aim is to replace adding further config options with the pod template feature then the existing builders do need to be more intelligent in what they do to avoid generating invalid pod specs. This is regardless of whether the template feature is opinionated about validation, even if the template feature doesn't do validation, Spark code itself should be ensuring that it generates valid specs as far as it is able to. Obviously it can't detect every possible invalid spec that it might generate if the templates aren't being validated but it can avoid introducing easily avoidable invalid specs.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org