Github user rvesse commented on a diff in the pull request:
https://github.com/apache/spark/pull/22256#discussion_r214416634
--- Diff:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala
---
@@ -37,41 +40,99 @@ private[spark] class LocalDirsFeatureStep(
.orElse(conf.getOption("spark.local.dir"))
.getOrElse(defaultLocalDir)
.split(",")
+ private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS)
override def configurePod(pod: SparkPod): SparkPod = {
val localDirVolumes = resolvedLocalDirs
.zipWithIndex
.map { case (localDir, index) =>
- new VolumeBuilder()
- .withName(s"spark-local-dir-${index + 1}")
- .withNewEmptyDir()
- .endEmptyDir()
- .build()
+ val name = s"spark-local-dir-${index + 1}"
+ // To allow customisation of local dirs backing volumes we should
avoid creating
+ // emptyDir volumes if the volume is already defined in the pod
spec
+ hasVolume(pod, name) match {
+ case true =>
+ // For pre-existing volume definitions just re-use the volume
+ pod.pod.getSpec().getVolumes().asScala.find(v =>
v.getName.equals(name)).get
+ case false =>
+ // Create new emptyDir volume
+ new VolumeBuilder()
+ .withName(name)
+ .withNewEmptyDir()
+ .withMedium(useLocalDirTmpFs match {
+ case true => "Memory" // Use tmpfs
+ case false => null // Default - use nodes backing
storage
+ })
+ .endEmptyDir()
+ .build()
+ }
}
+
val localDirVolumeMounts = localDirVolumes
.zip(resolvedLocalDirs)
.map { case (localDirVolume, localDirPath) =>
- new VolumeMountBuilder()
- .withName(localDirVolume.getName)
- .withMountPath(localDirPath)
- .build()
+ hasVolumeMount(pod, localDirVolume.getName, localDirPath) match {
+ case true =>
+ // For pre-existing volume mounts just re-use the mount
+ pod.container.getVolumeMounts().asScala
+ .find(m => m.getName.equals(localDirVolume.getName)
+ && m.getMountPath.equals(localDirPath))
+ .get
+ case false =>
+ // Create new volume mount
+ new VolumeMountBuilder()
+ .withName (localDirVolume.getName)
+ .withMountPath (localDirPath)
+ .build()
+ }
+ }
+
+ // Check for conflicting volume mounts
+ for (m: VolumeMount <- localDirVolumeMounts) {
+ if (hasConflictingVolumeMount(pod, m.getName, m.getMountPath).size >
0) {
+ throw new SparkException(s"Conflicting volume mounts defined, pod
template attempted to " +
+ "mount SPARK_LOCAL_DIRS volume ${m.getName} multiple times or at
an alternative path " +
+ "then the expected ${m.getPath}")
}
+ }
+
val podWithLocalDirVolumes = new PodBuilder(pod.pod)
.editSpec()
- .addToVolumes(localDirVolumes: _*)
+ // Don't want to re-add volumes that already existed in the
incoming spec
+ // as duplicate definitions will lead to K8S API errors
+ .addToVolumes(localDirVolumes.filter(v => !hasVolume(pod,
v.getName)): _*)
--- End diff --
Ok, will do that Monday
FYI I notice @onursatici has now made some similar tweaks in his latest
commit -
https://github.com/apache/spark/pull/22146/commits/a4fde0cdc4dc5b64fd3f888244656371eb76f837
- notice several feature steps there now have `editOrNewX()` or `addToX()` so
that they combine with rather than overriding the template
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]