Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22256#discussion_r213771635 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala --- @@ -37,41 +40,99 @@ private[spark] class LocalDirsFeatureStep( .orElse(conf.getOption("spark.local.dir")) .getOrElse(defaultLocalDir) .split(",") + private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS) override def configurePod(pod: SparkPod): SparkPod = { val localDirVolumes = resolvedLocalDirs .zipWithIndex .map { case (localDir, index) => - new VolumeBuilder() - .withName(s"spark-local-dir-${index + 1}") - .withNewEmptyDir() - .endEmptyDir() - .build() + val name = s"spark-local-dir-${index + 1}" + // To allow customisation of local dirs backing volumes we should avoid creating + // emptyDir volumes if the volume is already defined in the pod spec + hasVolume(pod, name) match { + case true => + // For pre-existing volume definitions just re-use the volume + pod.pod.getSpec().getVolumes().asScala.find(v => v.getName.equals(name)).get + case false => + // Create new emptyDir volume + new VolumeBuilder() + .withName(name) + .withNewEmptyDir() + .withMedium(useLocalDirTmpFs match { + case true => "Memory" // Use tmpfs + case false => null // Default - use nodes backing storage + }) + .endEmptyDir() + .build() + } } + val localDirVolumeMounts = localDirVolumes .zip(resolvedLocalDirs) .map { case (localDirVolume, localDirPath) => - new VolumeMountBuilder() - .withName(localDirVolume.getName) - .withMountPath(localDirPath) - .build() + hasVolumeMount(pod, localDirVolume.getName, localDirPath) match { + case true => + // For pre-existing volume mounts just re-use the mount + pod.container.getVolumeMounts().asScala + .find(m => m.getName.equals(localDirVolume.getName) + && m.getMountPath.equals(localDirPath)) + .get + case false => + // Create new volume mount + new VolumeMountBuilder() + .withName (localDirVolume.getName) + .withMountPath (localDirPath) + .build() + } + } + + // Check for conflicting volume mounts + for (m: VolumeMount <- localDirVolumeMounts) { + if (hasConflictingVolumeMount(pod, m.getName, m.getMountPath).size > 0) { + throw new SparkException(s"Conflicting volume mounts defined, pod template attempted to " + + "mount SPARK_LOCAL_DIRS volume ${m.getName} multiple times or at an alternative path " + + "then the expected ${m.getPath}") } + } + val podWithLocalDirVolumes = new PodBuilder(pod.pod) .editSpec() - .addToVolumes(localDirVolumes: _*) + // Don't want to re-add volumes that already existed in the incoming spec + // as duplicate definitions will lead to K8S API errors + .addToVolumes(localDirVolumes.filter(v => !hasVolume(pod, v.getName)): _*) --- End diff -- All of this conflicting volume mount and conflicting volumes seems out of place here. If we're anticipating using the pod template file, keep in mind that the pod template feature is specifically not designed to do any validation. What kinds of errors are we hoping to avoid by doing the deduplication here?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org