Github user onursatici commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22256#discussion_r214198366
  
    --- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala
 ---
    @@ -37,41 +40,99 @@ private[spark] class LocalDirsFeatureStep(
         .orElse(conf.getOption("spark.local.dir"))
         .getOrElse(defaultLocalDir)
         .split(",")
    +  private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS)
     
       override def configurePod(pod: SparkPod): SparkPod = {
         val localDirVolumes = resolvedLocalDirs
           .zipWithIndex
           .map { case (localDir, index) =>
    -        new VolumeBuilder()
    -          .withName(s"spark-local-dir-${index + 1}")
    -          .withNewEmptyDir()
    -          .endEmptyDir()
    -          .build()
    +        val name = s"spark-local-dir-${index + 1}"
    +        // To allow customisation of local dirs backing volumes we should 
avoid creating
    +        // emptyDir volumes if the volume is already defined in the pod 
spec
    +        hasVolume(pod, name) match {
    +          case true =>
    +            // For pre-existing volume definitions just re-use the volume
    +            pod.pod.getSpec().getVolumes().asScala.find(v => 
v.getName.equals(name)).get
    +          case false =>
    +            // Create new emptyDir volume
    +            new VolumeBuilder()
    +              .withName(name)
    +              .withNewEmptyDir()
    +                .withMedium(useLocalDirTmpFs match {
    +                  case true => "Memory" // Use tmpfs
    +                  case false => null    // Default - use nodes backing 
storage
    +                })
    +              .endEmptyDir()
    +              .build()
    +        }
           }
    +
         val localDirVolumeMounts = localDirVolumes
           .zip(resolvedLocalDirs)
           .map { case (localDirVolume, localDirPath) =>
    -        new VolumeMountBuilder()
    -          .withName(localDirVolume.getName)
    -          .withMountPath(localDirPath)
    -          .build()
    +        hasVolumeMount(pod, localDirVolume.getName, localDirPath) match {
    +          case true =>
    +            // For pre-existing volume mounts just re-use the mount
    +            pod.container.getVolumeMounts().asScala
    +              .find(m => m.getName.equals(localDirVolume.getName)
    +                         && m.getMountPath.equals(localDirPath))
    +              .get
    +          case false =>
    +            // Create new volume mount
    +            new VolumeMountBuilder()
    +              .withName (localDirVolume.getName)
    +              .withMountPath (localDirPath)
    +              .build()
    +        }
    +      }
    +
    +    // Check for conflicting volume mounts
    +    for (m: VolumeMount <- localDirVolumeMounts) {
    +      if (hasConflictingVolumeMount(pod, m.getName, m.getMountPath).size > 
0) {
    +        throw new SparkException(s"Conflicting volume mounts defined, pod 
template attempted to " +
    +          "mount SPARK_LOCAL_DIRS volume ${m.getName} multiple times or at 
an alternative path " +
    +          "then the expected ${m.getPath}")
           }
    +    }
    +
         val podWithLocalDirVolumes = new PodBuilder(pod.pod)
           .editSpec()
    -        .addToVolumes(localDirVolumes: _*)
    +         // Don't want to re-add volumes that already existed in the 
incoming spec
    +         // as duplicate definitions will lead to K8S API errors
    +        .addToVolumes(localDirVolumes.filter(v => !hasVolume(pod, 
v.getName)): _*)
    --- End diff --
    
    @mccheah yeap we should document that, will add


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to