Github user onursatici commented on a diff in the pull request: https://github.com/apache/spark/pull/22256#discussion_r214198366 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala --- @@ -37,41 +40,99 @@ private[spark] class LocalDirsFeatureStep( .orElse(conf.getOption("spark.local.dir")) .getOrElse(defaultLocalDir) .split(",") + private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS) override def configurePod(pod: SparkPod): SparkPod = { val localDirVolumes = resolvedLocalDirs .zipWithIndex .map { case (localDir, index) => - new VolumeBuilder() - .withName(s"spark-local-dir-${index + 1}") - .withNewEmptyDir() - .endEmptyDir() - .build() + val name = s"spark-local-dir-${index + 1}" + // To allow customisation of local dirs backing volumes we should avoid creating + // emptyDir volumes if the volume is already defined in the pod spec + hasVolume(pod, name) match { + case true => + // For pre-existing volume definitions just re-use the volume + pod.pod.getSpec().getVolumes().asScala.find(v => v.getName.equals(name)).get + case false => + // Create new emptyDir volume + new VolumeBuilder() + .withName(name) + .withNewEmptyDir() + .withMedium(useLocalDirTmpFs match { + case true => "Memory" // Use tmpfs + case false => null // Default - use nodes backing storage + }) + .endEmptyDir() + .build() + } } + val localDirVolumeMounts = localDirVolumes .zip(resolvedLocalDirs) .map { case (localDirVolume, localDirPath) => - new VolumeMountBuilder() - .withName(localDirVolume.getName) - .withMountPath(localDirPath) - .build() + hasVolumeMount(pod, localDirVolume.getName, localDirPath) match { + case true => + // For pre-existing volume mounts just re-use the mount + pod.container.getVolumeMounts().asScala + .find(m => m.getName.equals(localDirVolume.getName) + && m.getMountPath.equals(localDirPath)) + .get + case false => + // Create new volume mount + new VolumeMountBuilder() + .withName (localDirVolume.getName) + .withMountPath (localDirPath) + .build() + } + } + + // Check for conflicting volume mounts + for (m: VolumeMount <- localDirVolumeMounts) { + if (hasConflictingVolumeMount(pod, m.getName, m.getMountPath).size > 0) { + throw new SparkException(s"Conflicting volume mounts defined, pod template attempted to " + + "mount SPARK_LOCAL_DIRS volume ${m.getName} multiple times or at an alternative path " + + "then the expected ${m.getPath}") } + } + val podWithLocalDirVolumes = new PodBuilder(pod.pod) .editSpec() - .addToVolumes(localDirVolumes: _*) + // Don't want to re-add volumes that already existed in the incoming spec + // as duplicate definitions will lead to K8S API errors + .addToVolumes(localDirVolumes.filter(v => !hasVolume(pod, v.getName)): _*) --- End diff -- @mccheah yeap we should document that, will add
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org