Github user mccheah commented on a diff in the pull request:
https://github.com/apache/spark/pull/22256#discussion_r214420503
--- Diff:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala
---
@@ -37,41 +40,99 @@ private[spark] class LocalDirsFeatureStep(
.orElse(conf.getOption("spark.local.dir"))
.getOrElse(defaultLocalDir)
.split(",")
+ private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS)
override def configurePod(pod: SparkPod): SparkPod = {
val localDirVolumes = resolvedLocalDirs
.zipWithIndex
.map { case (localDir, index) =>
- new VolumeBuilder()
- .withName(s"spark-local-dir-${index + 1}")
- .withNewEmptyDir()
- .endEmptyDir()
- .build()
+ val name = s"spark-local-dir-${index + 1}"
+ // To allow customisation of local dirs backing volumes we should
avoid creating
+ // emptyDir volumes if the volume is already defined in the pod
spec
+ hasVolume(pod, name) match {
+ case true =>
+ // For pre-existing volume definitions just re-use the volume
+ pod.pod.getSpec().getVolumes().asScala.find(v =>
v.getName.equals(name)).get
+ case false =>
+ // Create new emptyDir volume
+ new VolumeBuilder()
+ .withName(name)
+ .withNewEmptyDir()
+ .withMedium(useLocalDirTmpFs match {
+ case true => "Memory" // Use tmpfs
+ case false => null // Default - use nodes backing
storage
+ })
+ .endEmptyDir()
+ .build()
+ }
}
+
val localDirVolumeMounts = localDirVolumes
.zip(resolvedLocalDirs)
.map { case (localDirVolume, localDirPath) =>
- new VolumeMountBuilder()
- .withName(localDirVolume.getName)
- .withMountPath(localDirPath)
- .build()
+ hasVolumeMount(pod, localDirVolume.getName, localDirPath) match {
+ case true =>
+ // For pre-existing volume mounts just re-use the mount
+ pod.container.getVolumeMounts().asScala
+ .find(m => m.getName.equals(localDirVolume.getName)
+ && m.getMountPath.equals(localDirPath))
+ .get
+ case false =>
+ // Create new volume mount
+ new VolumeMountBuilder()
+ .withName (localDirVolume.getName)
+ .withMountPath (localDirPath)
+ .build()
+ }
+ }
+
+ // Check for conflicting volume mounts
+ for (m: VolumeMount <- localDirVolumeMounts) {
+ if (hasConflictingVolumeMount(pod, m.getName, m.getMountPath).size >
0) {
+ throw new SparkException(s"Conflicting volume mounts defined, pod
template attempted to " +
+ "mount SPARK_LOCAL_DIRS volume ${m.getName} multiple times or at
an alternative path " +
+ "then the expected ${m.getPath}")
}
+ }
+
val podWithLocalDirVolumes = new PodBuilder(pod.pod)
.editSpec()
- .addToVolumes(localDirVolumes: _*)
+ // Don't want to re-add volumes that already existed in the
incoming spec
+ // as duplicate definitions will lead to K8S API errors
+ .addToVolumes(localDirVolumes.filter(v => !hasVolume(pod,
v.getName)): _*)
--- End diff --
This is different in that we're looking for specific volumes that have been
set up by previous feature steps or outside logic. Preferably every step is
self-contained in that it doesn't have to look up specific values set by
previous steps.
For example this logic would break if we applied the templating after this
step, or if a different step after this one added the volumes that are being
looked up here.
Whereas `editOrNew` and `addTo...` at worst only change the ordering on
some of the fields depending on when the step is invoked in the sequence.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]