Github user rvesse commented on a diff in the pull request:
https://github.com/apache/spark/pull/22256#discussion_r213954156
--- Diff:
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala
---
@@ -37,41 +40,99 @@ private[spark] class LocalDirsFeatureStep(
.orElse(conf.getOption("spark.local.dir"))
.getOrElse(defaultLocalDir)
.split(",")
+ private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS)
override def configurePod(pod: SparkPod): SparkPod = {
val localDirVolumes = resolvedLocalDirs
.zipWithIndex
.map { case (localDir, index) =>
- new VolumeBuilder()
- .withName(s"spark-local-dir-${index + 1}")
- .withNewEmptyDir()
- .endEmptyDir()
- .build()
+ val name = s"spark-local-dir-${index + 1}"
+ // To allow customisation of local dirs backing volumes we should
avoid creating
+ // emptyDir volumes if the volume is already defined in the pod
spec
+ hasVolume(pod, name) match {
+ case true =>
+ // For pre-existing volume definitions just re-use the volume
+ pod.pod.getSpec().getVolumes().asScala.find(v =>
v.getName.equals(name)).get
+ case false =>
+ // Create new emptyDir volume
+ new VolumeBuilder()
+ .withName(name)
+ .withNewEmptyDir()
+ .withMedium(useLocalDirTmpFs match {
+ case true => "Memory" // Use tmpfs
+ case false => null // Default - use nodes backing
storage
+ })
+ .endEmptyDir()
+ .build()
+ }
}
+
val localDirVolumeMounts = localDirVolumes
.zip(resolvedLocalDirs)
.map { case (localDirVolume, localDirPath) =>
- new VolumeMountBuilder()
- .withName(localDirVolume.getName)
- .withMountPath(localDirPath)
- .build()
+ hasVolumeMount(pod, localDirVolume.getName, localDirPath) match {
+ case true =>
+ // For pre-existing volume mounts just re-use the mount
+ pod.container.getVolumeMounts().asScala
+ .find(m => m.getName.equals(localDirVolume.getName)
+ && m.getMountPath.equals(localDirPath))
+ .get
+ case false =>
+ // Create new volume mount
+ new VolumeMountBuilder()
+ .withName (localDirVolume.getName)
+ .withMountPath (localDirPath)
+ .build()
+ }
+ }
+
+ // Check for conflicting volume mounts
+ for (m: VolumeMount <- localDirVolumeMounts) {
+ if (hasConflictingVolumeMount(pod, m.getName, m.getMountPath).size >
0) {
+ throw new SparkException(s"Conflicting volume mounts defined, pod
template attempted to " +
+ "mount SPARK_LOCAL_DIRS volume ${m.getName} multiple times or at
an alternative path " +
+ "then the expected ${m.getPath}")
}
+ }
+
val podWithLocalDirVolumes = new PodBuilder(pod.pod)
.editSpec()
- .addToVolumes(localDirVolumes: _*)
+ // Don't want to re-add volumes that already existed in the
incoming spec
+ // as duplicate definitions will lead to K8S API errors
+ .addToVolumes(localDirVolumes.filter(v => !hasVolume(pod,
v.getName)): _*)
--- End diff --
The implementation is still additive in that it will add to existing
elements in the pod spec as needed but respect what is already present.
If your pod spec contains duplicate volumes/volume mounts then K8S will
reject it as invalid e.g.
```
The Pod "rvesse-test" is invalid: spec.volumes[1].name: Duplicate value:
"spark-local-dirs-1"
```
Therefore it is necessary to explicitly avoid duplicating things already
present in the template
If the aim is to replace adding further config options with the pod
template feature then the existing builders do need to be more intelligent in
what they do to avoid generating invalid pod specs. This is regardless of
whether the template feature is opinionated about validation, even if the
template feature doesn't do validation, Spark code itself should be ensuring
that it generates valid specs as far as it is able to. Obviously it can't
detect every possible invalid spec that it might generate if the templates
aren't being validated but it can avoid introducing easily avoidable invalid
specs.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]