[GitHub] spark pull request #21884: [SPARK-24960][K8S] explicitly expose ports on dri...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21884#discussion_r206999720 --- Diff: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala --- @@ -203,4 +212,12 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt;) assert(additionalProperties === expectedSparkConf) } + + def containerPort(name: String, portNumber: Int): ContainerPort = { +val port = new ContainerPort() --- End diff -- Use `ContainerPortBuilder` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207016169 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics + +import java.lang.management.{BufferPoolMXBean, ManagementFactory} +import javax.management.ObjectName + +import org.apache.spark.memory.MemoryManager + +/** + * Executor metric types for executor-level metrics stored in ExecutorMetrics. + */ +sealed trait ExecutorMetricType { + private[spark] def getMetricValue(memoryManager: MemoryManager): Long --- End diff -- Yup, on my end this is a low conviction suggestion - we might start feeling pain around this as we add more metric types, but for a first pass this is probably fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207073149 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,34 @@ private[spark] class AppStatusListener( } } } + +// check if there is a new peak value for any of the executor level memory metrics +// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed +// for the live UI. +event.executorUpdates.foreach { updates: ExecutorMetrics => + liveExecutors.get(event.execId).foreach { exec: LiveExecutor => +if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) { + maybeUpdate(exec, now) +} + } +} + } + + override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { +val now = System.nanoTime() + +// check if there is a new peak value for any of the executor level memory metrics, +// while reading from the log. SparkListenerStageExecutorMetrics are only processed +// when reading logs. +liveExecutors.get(executorMetrics.execId) + .orElse(deadExecutors.get(executorMetrics.execId)) match { + case Some(exec) => --- End diff -- Ok, this can stay as is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21923: [SPARK-24918][Core] Executor Plugin api
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21923#discussion_r207072577 --- Diff: core/src/main/java/org/apache/spark/AbstractExecutorPlugin.java --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import org.apache.spark.annotation.DeveloperApi; + +/** + * A plugin which can be automaticaly instantiated within each Spark executor. Users can specify + * plugins which should be created with the "spark.executor.plugins" configuration. An instance + * of each plugin will be created for every executor, including those created by dynamic allocation, + * before the executor starts running any tasks. + * + * The specific api exposed to the end users still considered to be very unstable. If implementors + * extend this base class, we will *hopefully* be able to keep compatability by providing dummy + * implementations for any methods added, but make no guarantees this will always be possible across + * all spark releases. + * + * Spark does nothing to verify the plugin is doing legitimate things, or to manage the resources + * it uses. A plugin acquires the same privileges as the user running the task. A bad plugin + * could also intefere with task execution and make the executor fail in unexpected ways. + */ +@DeveloperApi +public class AbstractExecutorPlugin { --- End diff -- `interface`? A bit more flexibility in terms of the user's desired class hierarchy. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207004997 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.metrics.ExecutorMetricType + +/** + * :: DeveloperApi :: + * Metrics tracked for executors and the driver. + * + * Executor-level metrics are sent from each executor to the driver as part of the Heartbeat. + */ +@DeveloperApi +class ExecutorMetrics private[spark] extends Serializable { + + // Metrics are indexed by MetricGetter.values + private val metrics = new Array[Long](ExecutorMetricType.values.length) --- End diff -- Yup that's fine - I did some googling, unfortunately there isn't a great way to iterate over fields of a case class. You could create a thin wrapper object around the array instead though, if we really think the nicer API is worthwhile: ``` case class Metrics(values: Seq[Long]) { def someMetric1(): Long = values(0) def def ... } ``` Or even this: ``` case class Metrics(metric1: Long, metric2: Long, metfic3: Long, ...) { def values(): Seq[Long] = Seq(metric1, metric2, metric3, ...) } ``` The latter which would be better because you'd be guaranteed to create the struct with the right number of metrics. Though such abstractions are not necessary by any means. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21884: [SPARK-24960][K8S] explicitly expose ports on driver con...
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21884 Thanks @shaneknapp, I'll merge this now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21884: [SPARK-24960][K8S] explicitly expose ports on driver con...
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21884 Thanks @adelbertc for the contribution! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r207002859 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,34 @@ private[spark] class AppStatusListener( } } } + +// check if there is a new peak value for any of the executor level memory metrics +// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed +// for the live UI. +event.executorUpdates.foreach { updates: ExecutorMetrics => + liveExecutors.get(event.execId).foreach { exec: LiveExecutor => +if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) { + maybeUpdate(exec, now) +} + } +} + } + + override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { +val now = System.nanoTime() + +// check if there is a new peak value for any of the executor level memory metrics, +// while reading from the log. SparkListenerStageExecutorMetrics are only processed +// when reading logs. +liveExecutors.get(executorMetrics.execId) + .orElse(deadExecutors.get(executorMetrics.execId)) match { + case Some(exec) => --- End diff -- From the [Scaladoc](https://www.scala-lang.org/api/2.10.2/index.html#scala.Option): > The most idiomatic way to use an scala.Option instance is to treat it as a collection or monad and use map,flatMap, filter, or foreach It's probably better to follow the Scala conventions here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21748#discussion_r205849187 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import org.scalatest.concurrent.Eventually +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{k8sTestTag, INTERVAL, TIMEOUT} + +trait ClientModeTestsSuite { k8sSuite: KubernetesSuite => + + test("Run in client mode.", k8sTestTag) { +val labels = Map("spark-app-selector" -> driverPodName) +val driverPort = 7077 +val blockManagerPort = 1 +val driverService = testBackend + .getKubernetesClient + .services() + .inNamespace(kubernetesTestComponents.namespace) + .createNew() +.withNewMetadata() + .withName(s"$driverPodName-svc") + .endMetadata() +.withNewSpec() + .withClusterIP("None") + .withSelector(labels.asJava) + .addNewPort() +.withName("driver-port") +.withPort(driverPort) +.withNewTargetPort(driverPort) +.endPort() + .addNewPort() +.withName("block-manager") +.withPort(blockManagerPort) +.withNewTargetPort(blockManagerPort) +.endPort() + .endSpec() +.done() +try { + val driverPod = testBackend +.getKubernetesClient +.pods() +.inNamespace(kubernetesTestComponents.namespace) +.createNew() + .withNewMetadata() + .withName(driverPodName) + .withLabels(labels.asJava) + .endMetadata() +.withNewSpec() + .withServiceAccountName("default") --- End diff -- Yup we can fix this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21900: [SPARK-23146][K8S][TESTS] Don't set service account name...
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21900 @skonto can you try this with spark-rbacyaml? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21900: [SPARK-23146][K8S][TESTS] Don't set service accou...
GitHub user mccheah opened a pull request: https://github.com/apache/spark/pull/21900 [SPARK-23146][K8S][TESTS] Don't set service account name for client mode test ## What changes were proposed in this pull request? Don't set service account name for the pod created in client mode ## How was this patch tested? Test should continue running smoothly in Jenkins. You can merge this pull request into a Git repository by running: $ git pull https://github.com/palantir/spark fix-integration-test-service-account Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21900.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21900 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21900: [SPARK-23146][K8S][TESTS] Don't set service account name...
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21900 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21884: [Scheduler][K8s]: explicitly expose ports on driver cont...
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21884 This needs a Spark ticket. I also don't think pods specifically have to expose ports. My understanding was that the ports field on the pod was only to assign ports names, so that the ports can be referenced by name and not by number on service objects. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209712159 --- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala --- @@ -180,6 +180,26 @@ private[spark] abstract class MemoryManager( onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed } + /** + * On heap execution memory currently in use, in bytes. + */ + final def onHeapExecutionMemoryUsed: Long = onHeapExecutionMemoryPool.memoryUsed --- End diff -- It probably should be if only because the variable is annotated with `@GuardedBy(this)`, so it makes the code more consistent to mark this as synchronized. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209717523 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,31 @@ private[spark] class AppStatusListener( } } } + +// check if there is a new peak value for any of the executor level memory metrics +// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed +// for the live UI. +event.executorUpdates.foreach { updates: ExecutorMetrics => + liveExecutors.get(event.execId).foreach { exec: LiveExecutor => +if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) { + maybeUpdate(exec, now) +} + } +} + } + + override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { +val now = System.nanoTime() --- End diff -- Should we use a `Clock` instance here for testing? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209714883 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.metrics.ExecutorMetricType + +/** + * :: DeveloperApi :: + * Metrics tracked for executors and the driver. + * + * Executor-level metrics are sent from each executor to the driver as part of the Heartbeat. + */ +@DeveloperApi +class ExecutorMetrics private[spark] extends Serializable { + + // Metrics are indexed by MetricGetter.values + private val metrics = new Array[Long](ExecutorMetricType.values.length) + + // the first element is initialized to -1, indicating that the values for the array + // haven't been set yet. + metrics(0) = -1 + + /** Returns the value for the specified metricType. */ + def getMetricValue(metricType: ExecutorMetricType): Long = { +metrics(ExecutorMetricType.metricIdxMap(metricType)) + } + + /** Returns true if the values for the metrics have been set, false otherwise. */ + def isSet(): Boolean = metrics(0) > -1 + + private[spark] def this(metrics: Array[Long]) { +this() +Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, this.metrics.size)) + } + + /** + * Constructor: create the ExecutorMetrics with the values specified. + * + * @param executorMetrics map of executor metric name to value + */ + private[spark] def this(executorMetrics: Map[String, Long]) { +this() +(0 until ExecutorMetricType.values.length).foreach { idx => + metrics(idx) = executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L) +} + } + + /** + * Compare the specified executor metrics values with the current executor metric values, + * and update the value for any metrics where the new value for the metric is larger. + * + * @param executorMetrics the executor metrics to compare + * @return if there is a new peak value for any metric + */ + private[spark] def compareAndUpdatePeakValues(executorMetrics: ExecutorMetrics): Boolean = { +var updated: Boolean = false --- End diff -- No need to specifically label this as `Boolean`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209716819 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,31 @@ private[spark] class AppStatusListener( } } } + +// check if there is a new peak value for any of the executor level memory metrics +// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed +// for the live UI. +event.executorUpdates.foreach { updates: ExecutorMetrics => --- End diff -- Does the type need to be explicitly defined here and in the next line? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209714796 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.metrics.ExecutorMetricType + +/** + * :: DeveloperApi :: + * Metrics tracked for executors and the driver. + * + * Executor-level metrics are sent from each executor to the driver as part of the Heartbeat. + */ +@DeveloperApi +class ExecutorMetrics private[spark] extends Serializable { + + // Metrics are indexed by MetricGetter.values + private val metrics = new Array[Long](ExecutorMetricType.values.length) + + // the first element is initialized to -1, indicating that the values for the array + // haven't been set yet. + metrics(0) = -1 + + /** Returns the value for the specified metricType. */ + def getMetricValue(metricType: ExecutorMetricType): Long = { +metrics(ExecutorMetricType.metricIdxMap(metricType)) + } + + /** Returns true if the values for the metrics have been set, false otherwise. */ + def isSet(): Boolean = metrics(0) > -1 + + private[spark] def this(metrics: Array[Long]) { +this() +Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, this.metrics.size)) + } + + /** + * Constructor: create the ExecutorMetrics with the values specified. + * + * @param executorMetrics map of executor metric name to value + */ + private[spark] def this(executorMetrics: Map[String, Long]) { +this() +(0 until ExecutorMetricType.values.length).foreach { idx => + metrics(idx) = executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L) +} + } + + /** + * Compare the specified executor metrics values with the current executor metric values, + * and update the value for any metrics where the new value for the metric is larger. + * + * @param executorMetrics the executor metrics to compare + * @return if there is a new peak value for any metric + */ + private[spark] def compareAndUpdatePeakValues(executorMetrics: ExecutorMetrics): Boolean = { +var updated: Boolean = false + +(0 until ExecutorMetricType.values.length).foreach { idx => + if ( executorMetrics.metrics(idx) > metrics(idx)) { --- End diff -- Nit: No space after the left bracket. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209715208 --- Diff: core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics + +import java.lang.management.{BufferPoolMXBean, ManagementFactory} +import javax.management.ObjectName + +import org.apache.spark.memory.MemoryManager + +/** + * Executor metric types for executor-level metrics stored in ExecutorMetrics. + */ +sealed trait ExecutorMetricType { + private[spark] def getMetricValue(memoryManager: MemoryManager): Long + private[spark] val name = getClass().getName().stripSuffix("$").split("""\.""").last +} + +private[spark] abstract class MemoryManagerExecutorMetricType( +f: MemoryManager => Long) extends ExecutorMetricType { + override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { +f(memoryManager) + } +} + +private[spark]abstract class MBeanExecutorMetricType(mBeanName: String) --- End diff -- Put a space after `[spark]`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209715001 --- Diff: core/src/main/scala/org/apache/spark/memory/MemoryManager.scala --- @@ -180,6 +180,26 @@ private[spark] abstract class MemoryManager( onHeapStorageMemoryPool.memoryUsed + offHeapStorageMemoryPool.memoryUsed } + /** + * On heap execution memory currently in use, in bytes. + */ + final def onHeapExecutionMemoryUsed: Long = onHeapExecutionMemoryPool.memoryUsed + + /** + * Off heap execution memory currently in use, in bytes. + */ + final def offHeapExecutionMemoryUsed: Long = offHeapExecutionMemoryPool.memoryUsed --- End diff -- `synchronized` here also and in the below two methods. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209711557 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.metrics.ExecutorMetricType + +/** + * :: DeveloperApi :: + * Metrics tracked for executors and the driver. + * + * Executor-level metrics are sent from each executor to the driver as part of the Heartbeat. + */ +@DeveloperApi +class ExecutorMetrics private[spark] extends Serializable { + + // Metrics are indexed by MetricGetter.values + private val metrics = new Array[Long](ExecutorMetricType.values.length) --- End diff -- Unclear - if we expose these metrics to some external consumer via an API for example, then we almost certainly want to have a schema labelling these fields for consumption by e.g. dashboards. I think what we have here is fine for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21923: [SPARK-24918][Core] Executor Plugin api
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21923#discussion_r209805442 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -130,6 +130,12 @@ private[spark] class Executor( private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) + Thread.currentThread().setContextClassLoader(replClassLoader) + conf.get(EXECUTOR_PLUGINS).foreach { classes => +Utils.loadExtensions(classOf[ExecutorPlugin], classes, conf) --- End diff -- For all cluster managers would this properly load plugins deployed via `--jars` in spark-submit or `spark.jars` in the SparkConf? I know that jar deployment and when they're available on the classpath may sometimes vary. Although worst case this seems like the kind of thing one may prefer to put in `spark.executor.extraClassPath` simply because those jars are guaranteed to be loaded at JVM boot time. In fact - I wonder if we should even move this extension loading further up in the lifecycle, simply so that the plugin can be around for a larger percentage of the executor JVM's uptime. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r209773404 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -669,6 +686,31 @@ private[spark] class AppStatusListener( } } } + +// check if there is a new peak value for any of the executor level memory metrics +// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed +// for the live UI. +event.executorUpdates.foreach { updates: ExecutorMetrics => + liveExecutors.get(event.execId).foreach { exec: LiveExecutor => +if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) { + maybeUpdate(exec, now) +} + } +} + } + + override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = { +val now = System.nanoTime() --- End diff -- Yup that's fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21221: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21221 @edwinalu I think this is almost ready, can you fix merge conflicts? @squito @felixcheung any other comments? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21584: [SPARK-24433][K8S] Initial R Bindings for SparkR on K8s
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21584 Let's merge at the end of the day pacific time (~5PM-ish) on Friday, August 17, pending any additional feedback on the mailing list thread discussing the subject of including this in 2.4. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22146: [WIP][SPARK-24434][K8S] pod template files
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/22146 @skonto @erikerlandson --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21221: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21221 We're going to delay on merging this until after the 2.4 branch is cut. We can include this in Spark 2.5. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22146: [WIP][SPARK-24434][K8S] pod template files
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22146#discussion_r211802430 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/TemplateVolumeStep.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import io.fabric8.kubernetes.api.model.{Config => _, _} + +import org.apache.spark.deploy.k8s._ + +private[spark] class TemplateVolumeStep( --- End diff -- So this pushes the pod spec yml from the spark-submit process's local disk up to the driver pod. It may be worthwhile to support specifying the file as a location in the driver pod that hasn't been mounted by spark-submit, but I think doing it this way is fine for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22146: [WIP][SPARK-24434][K8S] pod template files
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22146#discussion_r211800379 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala --- @@ -96,3 +112,25 @@ private[spark] class KubernetesDriverBuilder( spec } } + +private[spark] object KubernetesDriverBuilder extends Logging { + def apply(kubernetesClient: KubernetesClient, conf: SparkConf): KubernetesDriverBuilder = { +conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE) + .map(new File(_)) + .map(file => new KubernetesDriverBuilder(provideInitialSpec = conf => { +try { + val sparkPod = KubernetesUtils.loadPodFromTemplate( +kubernetesClient, +file, +Constants.DRIVER_CONTAINER_NAME) --- End diff -- You may `import org.apache.spark.deploy.k8s.Constants._` at the top of the file and then not need the `Constants` prefix here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22146: [WIP][SPARK-24434][K8S] pod template files
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22146#discussion_r211800127 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/TemplateVolumeStep.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import io.fabric8.kubernetes.api.model.{Config => _, _} + +import org.apache.spark.deploy.k8s._ + +private[spark] class TemplateVolumeStep( + conf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) + extends KubernetesFeatureConfigStep { + def configurePod(pod: SparkPod): SparkPod = { + require(conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) +val podTemplateFile = conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get +val podWithVolume = new PodBuilder(pod.pod) + .editSpec() --- End diff -- Match the indentation here with the indentation style down below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22146: [WIP][SPARK-24434][K8S] pod template files
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22146#discussion_r211800821 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala --- @@ -51,7 +57,13 @@ private[spark] class KubernetesDriverBuilder( provideJavaStep: ( KubernetesConf[KubernetesDriverSpecificConf] => JavaDriverFeatureStep) = -new JavaDriverFeatureStep(_)) { +new JavaDriverFeatureStep(_), +provideTemplateVolumeStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] + => TemplateVolumeStep) = +new TemplateVolumeStep(_), +provideInitialSpec: KubernetesConf[KubernetesDriverSpecificConf] --- End diff -- Why not `provideInitialPod` to be consistent with the executor builder? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22146: [WIP][SPARK-24434][K8S] pod template files
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22146#discussion_r211799242 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala --- @@ -59,5 +65,23 @@ private[spark] object KubernetesUtils { } } + def loadPodFromTemplate(kubernetesClient: KubernetesClient, + templateFile: File, + containerName: String): SparkPod = { +try { + val pod = kubernetesClient.pods().load(templateFile).get() + val container = pod.getSpec.getContainers.asScala +.filter(_.getName == containerName) --- End diff -- Can use `require(...exists)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22146: [WIP][SPARK-24434][K8S] pod template files
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22146#discussion_r211801962 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/TemplateVolumeStep.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import io.fabric8.kubernetes.api.model.{Config => _, _} + +import org.apache.spark.deploy.k8s._ + +private[spark] class TemplateVolumeStep( + conf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) + extends KubernetesFeatureConfigStep { + def configurePod(pod: SparkPod): SparkPod = { + require(conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) +val podTemplateFile = conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get +val podWithVolume = new PodBuilder(pod.pod) + .editSpec() + .addNewVolume() + .withName(Constants.POD_TEMPLATE_VOLUME) + .withHostPath(new HostPathVolumeSource(podTemplateFile)) --- End diff -- `hostPath` is not the correct volume type here. Instead, do the following: - Override `getAdditionalKubernetesResources()` with the following: 1. Load the contents of the template file from this process's local disk into a UTF-8 String 2. Create and return a `ConfigMap` object containing the contents of that config map, with some given key - In `configurePod`, add the config map as a volume in the pod spec, and add a volume mount pointing to that volume as done here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22146: [WIP][SPARK-24434][K8S] pod template files
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22146#discussion_r211800415 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala --- @@ -16,11 +16,17 @@ */ package org.apache.spark.deploy.k8s.submit -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf} -import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep, MountVolumesFeatureStep} +import java.io.File + +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.features._ --- End diff -- Undo these import changes. Keep the ordering correct, but import each class individually. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22146: [WIP][SPARK-24434][K8S] pod template files
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22146#discussion_r211800326 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala --- @@ -96,3 +112,25 @@ private[spark] class KubernetesDriverBuilder( spec } } + +private[spark] object KubernetesDriverBuilder extends Logging { + def apply(kubernetesClient: KubernetesClient, conf: SparkConf): KubernetesDriverBuilder = { +conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE) + .map(new File(_)) + .map(file => new KubernetesDriverBuilder(provideInitialSpec = conf => { +try { + val sparkPod = KubernetesUtils.loadPodFromTemplate( +kubernetesClient, +file, +Constants.DRIVER_CONTAINER_NAME) --- End diff -- Unclear if these container names should be configurable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22146: [WIP][SPARK-24434][K8S] pod template files
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22146#discussion_r211800176 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/TemplateVolumeStep.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import io.fabric8.kubernetes.api.model.{Config => _, _} + +import org.apache.spark.deploy.k8s._ + +private[spark] class TemplateVolumeStep( + conf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) + extends KubernetesFeatureConfigStep { + def configurePod(pod: SparkPod): SparkPod = { + require(conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) +val podTemplateFile = conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get +val podWithVolume = new PodBuilder(pod.pod) + .editSpec() + .addNewVolume() + .withName(Constants.POD_TEMPLATE_VOLUME) + .withHostPath(new HostPathVolumeSource(podTemplateFile)) + .endVolume() + .endSpec() + .build() + +val containerWithVolume = new ContainerBuilder(pod.container) +.withVolumeMounts(new VolumeMountBuilder() --- End diff -- `addNewVolumeMount()` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22146: [SPARK-24434][K8S] pod template files
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22146#discussion_r212126595 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala --- @@ -51,7 +57,13 @@ private[spark] class KubernetesDriverBuilder( provideJavaStep: ( KubernetesConf[KubernetesDriverSpecificConf] => JavaDriverFeatureStep) = -new JavaDriverFeatureStep(_)) { +new JavaDriverFeatureStep(_), +provideTemplateVolumeStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] + => TemplateVolumeStep) = +new TemplateVolumeStep(_), +provideInitialSpec: KubernetesConf[KubernetesDriverSpecificConf] --- End diff -- You can make the initial pod and wrap it in the `KubernetesDriverSpec` object. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22146: [SPARK-24434][K8S] pod template files
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22146#discussion_r212396719 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import java.io.File +import java.nio.charset.StandardCharsets + +import com.google.common.io.Files +import io.fabric8.kubernetes.api.model.{Config => _, _} --- End diff -- Instead of doing this, just import `Config`, then, do this import: `import org.apache.spark.deploy.k8s.Config._`. Also, wildcard-import constants: `import org.apache.spark.deploy.k8s.Constants._`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22146: [SPARK-24434][K8S] pod template files
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22146#discussion_r212396608 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import java.io.File +import java.nio.charset.StandardCharsets + +import com.google.common.io.Files +import io.fabric8.kubernetes.api.model.{Config => _, _} + +import org.apache.spark.deploy.k8s._ --- End diff -- Do not wildcard import the package. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22146: [SPARK-24434][K8S] pod template files
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22146#discussion_r212400281 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala --- @@ -81,13 +88,17 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit .build[java.lang.Long, java.lang.Long]() val executorPodsLifecycleEventHandler = new ExecutorPodsLifecycleManager( sc.conf, - new KubernetesExecutorBuilder(), + KubernetesExecutorBuilder(kubernetesClient, sc.conf), --- End diff -- I double checked and I don't think we use this variable inside `ExecutorPodsLifecycleManager`, can you remove it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22146: [SPARK-24434][K8S] pod template files
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22146#discussion_r212395959 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -225,6 +225,18 @@ private[spark] object Config extends Logging { "Ensure that major Python version is either Python2 or Python3") .createWithDefault("2") + val KUBERNETES_DRIVER_CONTAINER_NAME = +ConfigBuilder("spark.kubernetes.driver.containerName") --- End diff -- I prefer to make it explicit, rather than e.g. "pick the first container in the list". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22146: [SPARK-24434][K8S] pod template files
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22146#discussion_r212396387 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala --- @@ -59,5 +65,21 @@ private[spark] object KubernetesUtils { } } + def loadPodFromTemplate(kubernetesClient: KubernetesClient, --- End diff -- IDEs don't handle this indentation properly I think - you want this: ``` def loadPodFromTemplate( kubernetesClient: KubernetesClient, templateFile: File, containerName: String): SparkPod = { ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22146: [SPARK-24434][K8S] pod template files
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22146#discussion_r212448189 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -225,6 +225,18 @@ private[spark] object Config extends Logging { "Ensure that major Python version is either Python2 or Python3") .createWithDefault("2") + val KUBERNETES_DRIVER_CONTAINER_NAME = +ConfigBuilder("spark.kubernetes.driver.containerName") --- End diff -- This feature should support using multiple containers, in which case the user needs to specify which container is running the Spark process. Using a configuration option for that seems like the most straightforward solution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22146: [SPARK-24434][K8S] pod template files
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22146#discussion_r212722242 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -225,6 +225,18 @@ private[spark] object Config extends Logging { "Ensure that major Python version is either Python2 or Python3") .createWithDefault("2") + val KUBERNETES_DRIVER_CONTAINER_NAME = +ConfigBuilder("spark.kubernetes.driver.containerName") --- End diff -- Is there confusion because there's an _existing_ configuration option also? I think the existing configuration option sets the driver container name when no yml is specified. But perhaps the interpretation of this configuration value should change when the pod template is provided. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21584: [SPARK-24433][K8S] Initial R Bindings for SparkR on K8s
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21584 I filed https://issues.apache.org/jira/browse/SPARK-25152 for the integration tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21584: [SPARK-24433][K8S] Initial R Bindings for SparkR on K8s
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21584 Ok I am merging this to master now. Thanks for the work on this! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22146: [SPARK-24434][K8S] pod template files
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/22146 @yifeih can we also modify `docs/running-on-kubernetes.md` and have a section specific to using this feature? Mark it as experimental also. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22146: [SPARK-24434][K8S] pod template files
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/22146 Jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 > Yes, the service gets its endpoints by matching its label selector against labels on the pods so it's critical to have matching labels. Another tenable solution is for the driver backend code to get the pod object of the driver and sets the label selector of the service based on the labels on the driver pod. The problem is that the driver's labels might not be unique to that driver, which therefore would require the user to assign their own unique labels or for us to patch the driver pod in-place to assign it a unique label. Neither seem ideal. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 I think taking a step back, it seems unwise more so to be making any assumptions about the location in which a driver is running in client mode. Client mode is simply just saying that the application driver is running "locally", wherever locally is. That is why I suggested also to remove the driver's knowledge of the driver pod name and to remove the owner reference concept entirely. As soon as we start to make assumptions in one place, it may (though not necessarily) encourage us to rely on those assumptions to create more forks and complexities in the code in other places. That's a precedent we likely do not want to set. > Yes, and that's what my point above is about. Regardless of how the driver pod is created and managed, users need to make sure it contains a label that is unique for the pod/service combination to work. In one case the requirement for the unique label is explicit - the user has created their service and their pod manually and have assigned the labels accordingly. In another case, the user must know to assign the unique label but they would only know to do so from having read the documentation on deploying Spark. Explicit requirements that are well defined by the API are preferable to implicit requirements. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 Anyone know what's happening with this: ``` [error] /home/jenkins/workspace/testing-k8s-prb-make-spark-distribution-unified/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableSchemaSuite.scala:39: Symbol 'term org.scalacheck' is missing from the classpath. [error] This symbol is required by 'method org.scalatest.prop.Configuration.getParams'. [error] Make sure that term scalacheck is in your classpath and check for conflicting dependencies with `-Ylog-classpath`. [error] A full rebuild may help if 'Configuration.class' was compiled against an incompatible version of org. [error] serializer.deserialize[Any](serialized) match { [error]^ [error] one error found [error] Compile failed at Jul 20, 2018 12:33:14 PM [1.370s] ``` @shaneknapp --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 We discussed this offline. After some experimentation, we concluded that it's not actually straightforward to set up the headless service in the Kubernetes scheduler code in client mode, which would be where we'd have to put this code. The problem is that before the scheduler starts up, the driver needs to bind to the host given by `spark.driver.host`. But if that hostname is tied to a headless service and the headless service does not exist, the hostname bind will fail. There's a few ways to work around this, but all of them seem risky to put in a first pass at this patch. Additionally, it's not clear if the scheduler code should be opinionated about configuring network connectivity for the driver. Client mode is just a definition of the driver running in a local process - Spark currently doesn't make any assumptions about what that local process is, whether it's in a Kubernetes pod or not. Contrast this with cluster mode where the driver should know it's running in a pod as submitted by `KubernetesClientApplication`, and `KubernetesClientApplication` can know to set up its headless service. Therefore we are not going to have the driver set up the headless service in this patch. If we decide later on that creating the headless service is the right thing to do, we can introduce that functionality in a separate patch. I'm going to update the docs and hope to merge this by the end of the day on Monday, July 23. Let us know if there are any additional concerns. Thanks @liyinan926 @echarles. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 Never mind, think it's recovering now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 @liyinan926 did some of my own edits on top of your suggestions for docs wording on the latest patch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21800: [SPARK-24825][K8S][TEST] Kubernetes integration t...
GitHub user mccheah opened a pull request: https://github.com/apache/spark/pull/21800 [SPARK-24825][K8S][TEST] Kubernetes integration tests build the whole reactor ## What changes were proposed in this pull request? Make the integration test script build all modules. In order to not run all the non-Kubernetes integration tests in the build, support specifying tags and tag all integration tests specifically with "k8s". Supply the k8s tag in the dev/dev-run-integration-tests.sh script. ## How was this patch tested? The build system will test this. You can merge this pull request into a Git repository by running: $ git pull https://github.com/palantir/spark k8s-integration-tests-maven-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21800.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21800 commit 6a89c658c6a56c34d5e0578f09db77b4ed9b41f3 Author: mcheah Date: 2018-07-17T23:58:36Z [SPARK-24825][K8S][TEST] Kubernetes integration tests build the whole reactor. In order to not run all the non-Kubernetes integration tests in the build, support specifying tags and tag all integration tests specifically with "k8s". Supply the k8s tag in the dev/dev-run-integration-tests.sh script. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21800: [SPARK-24825][K8S][TEST] Kubernetes integration tests bu...
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21800 test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21800: [SPARK-24825][K8S][TEST] Kubernetes integration tests bu...
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21800 Addressed comments --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 @echarles it's highly likely your pods cannot communicate to your driver's host because of some network partition or a firewall issue. The specific steps you would have to take to fix that issue will depend in large part on how your hardware is set up. I don't know how much specific guidance I can give without knowing the specifics of your networking layout, but using some network diagnostics tooling would help. I would suggest for example trying to run a bash pod and exec into it, and to ping the host your driver would be running on - run some instrumentation and go from there. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 > agreed with @echarles that it would be great if the submission client will still create a headless service for the driver if the driver is running in a pod in client mode. The goal of this approach that specifically does not create a headless service is so that the client mode implementation here is identical to the client mode implementation of the other cluster managers. That is to say, the other cluster managers for client mode don't make any assumptions about connectivity, nor do they set up their own networking layers. Therefore I think it's appropriate here to avoid the extra complexity. > my pod has access to my host, so there is nothing to do on network level. In other words, which are the steps to make this PR work in client mode for Out-Cluster (assuming the network connectivity is OK)? Can you share on how you know that your executor pod has access to your host set by `spark.driver.host` and `spark.driver.port` over the network? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 > Can you point in the fork where the submission client is create the headless service? (just to help me understand the internals) > Btw If we stick to this manual approach, the need for the manual headless service should be documented. @echarles we create the headless service in spark-submit as part of spark-submit only for cluster mode: https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala#L69 Note that we only invoke any of the feature steps and the entry point of `KubernetesClientApplication` if we run in cluster mode. If we run in client mode, we enter directly into the user's main class, or, the user is in a process that just created a Spark context from scratch with the right master URL (i.e. `new SparkContext(k8s://my-master:8443)`). If you wanted to create the headless service in client mode, you'd have to do it when instantiating the `KubernetesClusterSchedulerBackend` somewhere, probably before creating the `ExecutorPodsAllocator` so that you'd set `spark.driver.host` and `spark.driver.port` properly when telling the created executors where to find the driver via the `--driver-url` argument. I've deferred implementing this code path. The documentation for using a headless service is only suggested, but not mentioned as a hard requirement: https://github.com/apache/spark/pull/21748/files#diff-b5527f236b253e0d9f5db5164bdb43e9R131. I didn't put this as a hard requirement because I could imagine some users wanting to not specifically use a headless service for this; perhaps they want to use a full Service object and share that service object with ports to be exposed for other reachable endpoints their pod exposes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 @liyinan926 I wonder if we want to have the pod name owner reference still be a thing, if you will, in client mode. For example what if the pod name that is given is accidentally one that is assigned to a different running pod in that namespace? Seems simpler to only require the pod name to be set in cluster mode, and in client mode, the user is responsible for their own cleanup. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 > there was a change in Spark recently in how the driver self-discovered its hostname by default, if I am not mistaken. Can't recall the exact patch. I remember that change specifically prompting us to set up the headless service for cluster mode. Not exactly recently, but at some point after we started work in mainline. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 @echarles then you'd probably want more information such as the logs of the executors, though I'd imagine that one would have trouble getting those given that the executor exits so quickly. But maybe a `kubectl get pods -n
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 Also make sure your driver can actually allocate the port specified by `spark.driver.port`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 @echarles there was a change in Spark recently in how the driver self-discovered its hostname by default, if I am not mistaken. Can't recall the exact patch. I remember that change specifically prompting us to set up the headless service for cluster mode. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 My guess is that the executor is failing to contact the driver - I don't think you can use the pod name as the hostname in minikube. Can you create a headless service to bind your pod to a stable hostname and try again? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 Comments are addressed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21748#discussion_r202801155 --- Diff: docs/running-on-kubernetes.md --- @@ -120,8 +120,8 @@ This URI is the location of the example jar that is already in the Docker image. ## Client Mode Starting with Spark 2.4.0, it is possible to run Spark applications on Kubernetes in client mode. When running a Spark -application in client mode, a separate pod is not deployed to run the driver. When running an application in -client mode, it is recommended to account for the following factors: +application in client mode, a separate pod is not deployed to run the driver. Your Spark driver does not need to run in +a Kubernetes pod. When running an application in client mode, it is recommended to account for the following factors: --- End diff -- Think that's a bit wordy - perhaps, "When your application runs in client mode, the driver process is run locally. The driver can run inside a pod or on a physical host." --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 Something's misbehaving in the integration tests, as the integration test build isn't able to track dependencies from other modules in the Spark project. Instead, the integration test tries to download its Spark dependencies from Maven central, which is incorrect. `mvn install` is only a temporary hacky solution. However we should be tracking the integration test issue separately. Can we get a final sign off here and merge? @liyinan926 @felixcheung --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 Filed https://issues.apache.org/jira/browse/SPARK-24825 to track the test problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21748#discussion_r202772018 --- Diff: docs/running-on-kubernetes.md --- @@ -399,18 +426,18 @@ specific to Spark on Kubernetes. Path to the CA cert file for connecting to the Kubernetes API server over TLS from the driver pod when requesting executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod. -Specify this as a path as opposed to a URI (i.e. do not provide a scheme). +Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use +spark.kubernetes.authenticate.caCertFile instead. spark.kubernetes.authenticate.driver.clientKeyFile (none) Path to the client key file for authenticating against the Kubernetes API server from the driver pod when requesting -executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod. -Specify this as a path as opposed to a URI (i.e. do not provide a scheme). If this is specified, it is highly -recommended to set up TLS for the driver submission server, as this value is sensitive information that would be -passed to the driver pod in plaintext otherwise. --- End diff -- This comment is very much out of date actually, it's referring to an old version of this code where we didn't use K8s secrets to push this data, but we instead used a custom HTTP server mounted on the driver. It's from before this was merged into apache/master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 Merging in a few hours if no additional comments are raised. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21748#discussion_r205177861 --- Diff: docs/running-on-kubernetes.md --- @@ -117,6 +117,45 @@ If the local proxy is running at localhost:8001, `--master k8s://http://127.0.0. spark-submit. Finally, notice that in the above example we specify a jar with a specific URI with a scheme of `local://`. This URI is the location of the example jar that is already in the Docker image. +## Client Mode + +Starting with Spark 2.4.0, it is possible to run Spark applications on Kubernetes in client mode. When your application +runs in client mode, the driver can run inside a pod or on a physical host. When running an application in client mode, +it is recommended to account for the following factors: + +### Client Mode Networking + +Spark executors must be able to connect to the Spark driver over a hostname and a port that is routable from the Spark +executors. The specific network configuration that will be required for Spark to work in client mode will vary per +setup. If you run your driver inside a Kubernetes pod, you can use a +[headless service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services) to allow your +driver pod to be routable from the executors by a stable hostname. When deploying your headless service, ensure that +the service's label selector will only match the driver pod and no other pods; it is recommended to assign your driver +pod a sufficiently unique label and to use that label in the label selector of the headless service. Specify the driver's +hostname via `spark.driver.host` and your spark driver's port to `spark.driver.port`. --- End diff -- Yeah manual setup is fine for now. Think additional docs around how to do all this can be a separate PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21748#discussion_r205178092 --- Diff: docs/running-on-kubernetes.md --- @@ -117,6 +117,45 @@ If the local proxy is running at localhost:8001, `--master k8s://http://127.0.0. spark-submit. Finally, notice that in the above example we specify a jar with a specific URI with a scheme of `local://`. This URI is the location of the example jar that is already in the Docker image. +## Client Mode + +Starting with Spark 2.4.0, it is possible to run Spark applications on Kubernetes in client mode. When your application +runs in client mode, the driver can run inside a pod or on a physical host. When running an application in client mode, +it is recommended to account for the following factors: + +### Client Mode Networking + +Spark executors must be able to connect to the Spark driver over a hostname and a port that is routable from the Spark +executors. The specific network configuration that will be required for Spark to work in client mode will vary per +setup. If you run your driver inside a Kubernetes pod, you can use a +[headless service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services) to allow your +driver pod to be routable from the executors by a stable hostname. When deploying your headless service, ensure that +the service's label selector will only match the driver pod and no other pods; it is recommended to assign your driver +pod a sufficiently unique label and to use that label in the label selector of the headless service. Specify the driver's +hostname via `spark.driver.host` and your spark driver's port to `spark.driver.port`. + +### Client Mode Executor Pod Garbage Collection + +If you run your Spark driver in a pod, it is highly recommended to set `spark.driver.pod.name` to the name of that pod. +When this property is set, the Spark scheduler will deploy the executor pods with an +[OwnerReference](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/), which in turn will +ensure that once the driver pod is deleted from the cluster, all of the application's executor pods will also be deleted. +The driver will look for a pod with the given name in the namespace specified by `spark.kubernetes.namespace`, and +an OwnerReference pointing to that pod will be added to each executor pod's OwnerReferences list. Be careful to avoid +setting the OwnerReference to a pod that is not actually that driver pod, or else the executors may be terminated +prematurely when the wrong pod is deleted. + +If your application is not running inside a pod, or if `spark.driver.pod.name` is not set when your application is +actually running in a pod, keep in mind that the executor pods may not be properly deleted from the cluster when the +application exits. The Spark scheduler attempts to delete these pods, but if the network request to the API server fails +for any reason, these pods will remain in the cluster. The executor processes should exit when they cannot reach the +driver, so the executor pods should not consume compute resources (cpu and memory) in the cluster after your application --- End diff -- Unclear, it triggers in the `onDisconnected` event so I think there's a persistent socket connection that's dropped that causes the exit. So, it should more or less be instantaneous. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21748#discussion_r205178769 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala --- @@ -35,26 +35,39 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s") override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { -if (masterURL.startsWith("k8s") && - sc.deployMode == "client" && - !sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK).getOrElse(false)) { - throw new SparkException("Client mode is currently not supported for Kubernetes.") -} - new TaskSchedulerImpl(sc) } override def createSchedulerBackend( sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend = { +val wasSparkSubmittedInClusterMode = sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK) +val (authConfPrefix, + apiServerUri, + defaultServiceAccountToken, + defaultServiceAccountCaCrt) = if (wasSparkSubmittedInClusterMode) { + require(sc.conf.get(KUBERNETES_DRIVER_POD_NAME).isDefined, +"If the application is deployed using spark-submit in cluster mode, the driver pod name " + + "must be provided.") + (KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX, +KUBERNETES_MASTER_INTERNAL_URL, +Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), +Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) +} else { + (KUBERNETES_AUTH_CLIENT_MODE_PREFIX, +masterURL.substring("k8s://".length()), --- End diff -- We can make such a helper function, currently this logic is done here and in KubernetesClientApplication --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 Ok after the next build passes I'm going to merge immediately. Thanks for the review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21874: [SPARK-23146][K8S][TESTS] Enable client mode inte...
GitHub user mccheah opened a pull request: https://github.com/apache/spark/pull/21874 [SPARK-23146][K8S][TESTS] Enable client mode integration test. ## What changes were proposed in this pull request? Enable client mode integration test after merging from master. ## How was this patch tested? Check the integration test runs in the build. You can merge this pull request into a Git repository by running: $ git pull https://github.com/palantir/spark enable-client-mode-test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21874.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21874 commit 3fd888901a01e2d9cf3903c7888afb39d93214b2 Author: mcheah Date: 2018-07-25T18:24:12Z [SPARK-23146][K8S][TESTS] Enable client mode integration test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21874: [SPARK-23146][K8S][TESTS] Enable client mode integration...
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21874 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 @echarles I don't think we should be special-casing Kubernetes here as being any different from the other cluster managers. The main point of client mode is that the driver is running locally and we make no assumptions about network connectivity. Deploying notebooks on other cluster managers which have network isolation between the driver and the executors would require the user to manually configure their own connectivity too. There's another problem in creating the headless service here: when you create a service, you have to back that service with pods that have certain labels; that is, a label selector. However, labels are not unique for a given pod; multiple pods can have the same label key-value pairs. Now, when Spark Submit has explicit control over the pod that is created as well as the service, we make it such that the driver pod has a unique label corresponding to an application id, and then the service can be ensured to be backed by only that driver pod. If we don't control the creation of the driver pod, we can't guarantee that whatever service we create would route to only the driver pod running the application in question. It is therefore unclear what the label selector would be for the headless service we create automatically. You could allow the user to have some control over the configuration of the headless service. For example, we could have a setting indicating the label selector the headless service should be created with. Or, the documentation can specifically require the driver pod to have a set of labels which are strictly unique to that pod in that namespace. In both cases, I would argue that we end up with the same complexity and mental burden, if not more so, than having the user deploy a headless service that gives their driver pod (and only that driver pod) a stable hostname. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 Though I suppose you could have the driver patch its own metadata fields to assign itself a unique label. I could see that being confusing to users when they observe that their driver pod metadata is modified from what they originally deployed. Nevertheless, it's a tenable option. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 > About selecting the pod with labels, another approach I have taken is simply using the name of the driver pod, a bit like I have done with the following deployment (so no need to ensure labels - the ports are the ports assigned by spark that the code can retrieve). I don't think you can back a service with a selector that's a pod's name, but someone with more knowledge of the Service API might be able to correct me here. I was under the impression one had to use labels. In your example, the service would match any pod with the label key of `run` being equal to `spark-pod`, which isn't guaranteed to map to a single unique pod. In spark-submit we set `spark-app-id` to a unique identifier. > If I compare with yarn-client with all nodes on the same LAN But if you run a YARN application with the driver not being on the same network, then the user has to set up their own connectivity. In Kubernetes that kind of networking setup happens to come up more often, perhaps, but it's not enough reason to introduce the complexity of it. Another situation where we want the driver to not be making the headless service is in a world where the driver shouldn't have permission to create service objects, but can have permission to create pod objects. Adding a flag allowing the driver to create the headless service would implicitly change the required permissions of the application. This is more work to document and more for the application writer to consider. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21748: [SPARK-23146][K8S] Support client mode.
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21748 Sounds fine. How does the documentation look now in that regard? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21862: [SPARK-24903][K8s] Make driver container name configurab...
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21862 We're holding on all additional pod modification feature requests in favor of work on https://issues.apache.org/jira/browse/SPARK-24434, which would allow the pods to be customized arbitrarily. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22256: [SPARK-25262][K8S] Better support configurability...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22256#discussion_r213771635 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala --- @@ -37,41 +40,99 @@ private[spark] class LocalDirsFeatureStep( .orElse(conf.getOption("spark.local.dir")) .getOrElse(defaultLocalDir) .split(",") + private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS) override def configurePod(pod: SparkPod): SparkPod = { val localDirVolumes = resolvedLocalDirs .zipWithIndex .map { case (localDir, index) => -new VolumeBuilder() - .withName(s"spark-local-dir-${index + 1}") - .withNewEmptyDir() - .endEmptyDir() - .build() +val name = s"spark-local-dir-${index + 1}" +// To allow customisation of local dirs backing volumes we should avoid creating +// emptyDir volumes if the volume is already defined in the pod spec +hasVolume(pod, name) match { + case true => +// For pre-existing volume definitions just re-use the volume +pod.pod.getSpec().getVolumes().asScala.find(v => v.getName.equals(name)).get + case false => +// Create new emptyDir volume +new VolumeBuilder() + .withName(name) + .withNewEmptyDir() +.withMedium(useLocalDirTmpFs match { + case true => "Memory" // Use tmpfs + case false => null// Default - use nodes backing storage +}) + .endEmptyDir() + .build() +} } + val localDirVolumeMounts = localDirVolumes .zip(resolvedLocalDirs) .map { case (localDirVolume, localDirPath) => -new VolumeMountBuilder() - .withName(localDirVolume.getName) - .withMountPath(localDirPath) - .build() +hasVolumeMount(pod, localDirVolume.getName, localDirPath) match { + case true => +// For pre-existing volume mounts just re-use the mount +pod.container.getVolumeMounts().asScala + .find(m => m.getName.equals(localDirVolume.getName) + && m.getMountPath.equals(localDirPath)) + .get + case false => +// Create new volume mount +new VolumeMountBuilder() + .withName (localDirVolume.getName) + .withMountPath (localDirPath) + .build() +} + } + +// Check for conflicting volume mounts +for (m: VolumeMount <- localDirVolumeMounts) { + if (hasConflictingVolumeMount(pod, m.getName, m.getMountPath).size > 0) { +throw new SparkException(s"Conflicting volume mounts defined, pod template attempted to " + + "mount SPARK_LOCAL_DIRS volume ${m.getName} multiple times or at an alternative path " + + "then the expected ${m.getPath}") } +} + val podWithLocalDirVolumes = new PodBuilder(pod.pod) .editSpec() -.addToVolumes(localDirVolumes: _*) + // Don't want to re-add volumes that already existed in the incoming spec + // as duplicate definitions will lead to K8S API errors +.addToVolumes(localDirVolumes.filter(v => !hasVolume(pod, v.getName)): _*) --- End diff -- All of this conflicting volume mount and conflicting volumes seems out of place here. If we're anticipating using the pod template file, keep in mind that the pod template feature is specifically not designed to do any validation. What kinds of errors are we hoping to avoid by doing the deduplication here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22256: [SPARK-25262][K8S] Better support configurability...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22256#discussion_r214107006 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala --- @@ -37,41 +40,99 @@ private[spark] class LocalDirsFeatureStep( .orElse(conf.getOption("spark.local.dir")) .getOrElse(defaultLocalDir) .split(",") + private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS) override def configurePod(pod: SparkPod): SparkPod = { val localDirVolumes = resolvedLocalDirs .zipWithIndex .map { case (localDir, index) => -new VolumeBuilder() - .withName(s"spark-local-dir-${index + 1}") - .withNewEmptyDir() - .endEmptyDir() - .build() +val name = s"spark-local-dir-${index + 1}" +// To allow customisation of local dirs backing volumes we should avoid creating +// emptyDir volumes if the volume is already defined in the pod spec +hasVolume(pod, name) match { + case true => +// For pre-existing volume definitions just re-use the volume +pod.pod.getSpec().getVolumes().asScala.find(v => v.getName.equals(name)).get + case false => +// Create new emptyDir volume +new VolumeBuilder() + .withName(name) + .withNewEmptyDir() +.withMedium(useLocalDirTmpFs match { + case true => "Memory" // Use tmpfs + case false => null// Default - use nodes backing storage +}) + .endEmptyDir() + .build() +} } + val localDirVolumeMounts = localDirVolumes .zip(resolvedLocalDirs) .map { case (localDirVolume, localDirPath) => -new VolumeMountBuilder() - .withName(localDirVolume.getName) - .withMountPath(localDirPath) - .build() +hasVolumeMount(pod, localDirVolume.getName, localDirPath) match { + case true => +// For pre-existing volume mounts just re-use the mount +pod.container.getVolumeMounts().asScala + .find(m => m.getName.equals(localDirVolume.getName) + && m.getMountPath.equals(localDirPath)) + .get + case false => +// Create new volume mount +new VolumeMountBuilder() + .withName (localDirVolume.getName) + .withMountPath (localDirPath) + .build() +} + } + +// Check for conflicting volume mounts +for (m: VolumeMount <- localDirVolumeMounts) { + if (hasConflictingVolumeMount(pod, m.getName, m.getMountPath).size > 0) { +throw new SparkException(s"Conflicting volume mounts defined, pod template attempted to " + + "mount SPARK_LOCAL_DIRS volume ${m.getName} multiple times or at an alternative path " + + "then the expected ${m.getPath}") } +} + val podWithLocalDirVolumes = new PodBuilder(pod.pod) .editSpec() -.addToVolumes(localDirVolumes: _*) + // Don't want to re-add volumes that already existed in the incoming spec + // as duplicate definitions will lead to K8S API errors +.addToVolumes(localDirVolumes.filter(v => !hasVolume(pod, v.getName)): _*) --- End diff -- > This is regardless of whether the template feature is opinionated about validation, even if the template feature doesn't do validation, Spark code itself should be ensuring that it generates valid specs as far as it is able to. This is a stance that as far I'm aware, we specifically chose not to take in the pod template feature. If one is using the pod template feature then Spark won't provide any guarantees that the pod it makes will be well-formed. When spark submit deploys the pod to the cluster the API will return a clear enough error informing the user to make the appropriate corrections to their pod template. @onursatici I just checked the pod template files PR, I didn't see this specifically called out - should this be documented? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/22112 @cloud-fan @tgravescs was wondering if we could get an ETA on this landing? Also, I tried running something analogous to the example script from the description of https://issues.apache.org/jira/browse/SPARK-23207, but for RDDs. However, it did not manifest the correctness problem even before this patch was applied. Are there any ways to reliably reproduce this with a minimal script? The below script is run in my Spark shell, Spark standalone mode single-node cluster with 2 workers, client mode, with the external shuffle service enabled. It does not reproduce the issue. ``` import scala.sys.process._ import org.apache.spark.TaskContext val res = sc.parallelize(0 until 1000 * 1000, 1).coalesce(200, shuffle = true).map { x => x }.coalesce(200, shuffle = true).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) { throw new Exception("pkill -f -n java".!!) // Kills the newest Java process, ideally the executors } x } res.distinct().count() ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22298: [SPARK-25021][K8S] Add spark.executor.pyspark.mem...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22298#discussion_r214491727 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala --- @@ -51,6 +51,7 @@ private[spark] class BasicDriverFeatureStep( .get(DRIVER_MEMORY_OVERHEAD) .getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) + // TODO: Have memory limit checks on driverMemory --- End diff -- Hm can you elaborate here? We already set the driver memory limit in this step based on the overhead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22215: [SPARK-25222][K8S] Improve container status loggi...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22215#discussion_r214490999 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala --- @@ -60,4 +64,81 @@ private[spark] object KubernetesUtils { } def parseMasterUrl(url: String): String = url.substring("k8s://".length) + + def formatPairsBundle(pairs: Seq[(String, String)], indent: Int = 1) : String = { +// Use more loggable format if value is null or empty +val indentStr = "\t" * indent --- End diff -- Can we prefer space-based indentation? Curious as to whether others have an opinion about this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22215: [SPARK-25222][K8S] Improve container status logging
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/22215 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22298: [SPARK-25021][K8S] Add spark.executor.pyspark.mem...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22298#discussion_r214492080 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala --- @@ -38,7 +39,8 @@ private[spark] class JavaDriverFeatureStep( .build() SparkPod(pod.pod, withDriverArgs) } - override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty + override def getAdditionalPodSystemProperties(): Map[String, String] = --- End diff -- Again, not sure what our direction is going to be with respect to mixed pipelines throughout the cluster manager sections - if we should be supporting it in a first class way then perhaps we file a JIRA and we can discuss how the submission client should be refactored to support that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22215: [SPARK-25222][K8S] Improve container status loggi...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22215#discussion_r214491177 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala --- @@ -60,4 +64,81 @@ private[spark] object KubernetesUtils { } def parseMasterUrl(url: String): String = url.substring("k8s://".length) + + def formatPairsBundle(pairs: Seq[(String, String)], indent: Int = 1) : String = { +// Use more loggable format if value is null or empty +val indentStr = "\t" * indent +pairs.map { + case (k, v) => s"\n$indentStr $k: ${Option(v).filter(_.nonEmpty).getOrElse("N/A")}" +}.mkString("") + } + + /** + * Given a pod, output a human readable representation of its state + * + * @param pod Pod + * @return Human readable pod state + */ + def formatPodState(pod: Pod): String = { +val details = Seq[(String, String)]( + // pod metadata + ("pod name", pod.getMetadata.getName), + ("namespace", pod.getMetadata.getNamespace), + ("labels", pod.getMetadata.getLabels.asScala.mkString(", ")), + ("pod uid", pod.getMetadata.getUid), + ("creation time", formatTime(pod.getMetadata.getCreationTimestamp)), + + // spec details + ("service account name", pod.getSpec.getServiceAccountName), + ("volumes", pod.getSpec.getVolumes.asScala.map(_.getName).mkString(", ")), + ("node name", pod.getSpec.getNodeName), + + // status + ("start time", formatTime(pod.getStatus.getStartTime)), + ("phase", pod.getStatus.getPhase), + ("container status", containersDescription(pod, 2)) +) + +formatPairsBundle(details) + } + + def containersDescription(p: Pod, indent: Int = 1): String = { +p.getStatus.getContainerStatuses.asScala.map { status => + Seq( +("container name", status.getName), +("container image", status.getImage)) ++ +containerStatusDescription(status) +}.map(p => formatPairsBundle(p, indent)).mkString("\n\n") + } + + def containerStatusDescription(containerStatus: ContainerStatus) +: Seq[(String, String)] = { +val state = containerStatus.getState +Option(state.getRunning) --- End diff -- This is a really cool use of partial functions - wonder if there's other places where we should be matching this way (of course doesn't have to be done here). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22215: [SPARK-25222][K8S] Improve container status loggi...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22215#discussion_r214491272 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala --- @@ -151,13 +152,15 @@ private[spark] class ExecutorPodsLifecycleManager( private def exitReasonMessage(podState: FinalPodState, execId: Long, exitCode: Int) = { val pod = podState.pod +val reason = Option(pod.getStatus.getReason) +val message = Option(pod.getStatus.getMessage) s""" |The executor with id $execId exited with exit code $exitCode. - |The API gave the following brief reason: ${pod.getStatus.getReason} - |The API gave the following message: ${pod.getStatus.getMessage} + |The API gave the following brief reason: ${reason.getOrElse("")} --- End diff -- Maybe default as `N/A`? Might be confusing to be left blank. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22298: [SPARK-25021][K8S] Add spark.executor.pyspark.mem...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22298#discussion_r214492297 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala --- @@ -72,12 +72,33 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => isJVM = false, pyFiles = Some(PYSPARK_CONTAINER_TESTS)) } + + test("Run PySpark with memory customization", k8sTestTag) { +sparkAppConf + .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}") --- End diff -- Some of this stuff can be factored out I think, we just haven't done so yet. I wouldn't block a merge of this on such a refactor, but this entire test class could probably use some cleanup with respect to how the code is structured. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22257: [SPARK-25264][K8S] Fix comma-delineated arguments passed...
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/22257 Looks good, thanks. Merging. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22298: [SPARK-25021][K8S] Add spark.executor.pyspark.memory lim...
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/22298 https://issues.apache.org/jira/browse/SPARK-25291 looks like a real issue with the way the tests are written. So we don't necessarily want to ignore it for this patch, but we're still thinking about it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21669: [SPARK-23257][K8S] Kerberos Support for Spark on ...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21669#discussion_r215446604 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala --- @@ -51,7 +51,11 @@ private[spark] class KubernetesDriverBuilder( provideJavaStep: ( KubernetesConf[KubernetesDriverSpecificConf] => JavaDriverFeatureStep) = -new JavaDriverFeatureStep(_)) { +new JavaDriverFeatureStep(_), --- End diff -- Argument type is a function with the default value being assigned to the default constructor. It might be cleaner to write this as follows: - Constructor in the class declaration has _no_ default arguments - Provide an empty arg constructor (`def this()`) which passes along all default implementations to the full-args constructor. In this case default arguments are hurting more than they are helping, and in the unit tests, we always supply mocks for _every_ arg, meaning we always use all defaults (real implementation) or no defaults (test implementation). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22146: [SPARK-24434][K8S] pod template files
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/22146 I think we still need some consensus on the driver container API. Personally I think it's clearer to use a configuration option to specifically select the container that is the driver, but this is an opinion I hold loosely. Would like specific weigh-in from @erikerlandson and @liyinan926. Aside from that, this looks good on my end apart from the tests we need to add and should be ready to merge pretty soon (depending also on Spark 2.4 timeline) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22323: [SPARK-25262][K8S] Allow SPARK_LOCAL_DIRS to be t...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22323#discussion_r215456338 --- Diff: docs/running-on-kubernetes.md --- @@ -215,6 +215,19 @@ spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.clai The configuration properties for mounting volumes into the executor pods use prefix `spark.kubernetes.executor.` instead of `spark.kubernetes.driver.`. For a complete list of available options for each supported type of volumes, please refer to the [Spark Properties](#spark-properties) section below. +## Local Storage + +Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `SPARK_LOCAL_DIRS`. If no directories are explicitly specified then a default directory is created and configured appropriately. + +`emptyDir` volumes use the ephemeral storage feature of Kubernetes and do not persist beyond the life of the pod. + +### Using RAM for local storage + +As `emptyDir` volumes use the nodes backing storage for ephemeral storage this default behaviour may not be appropriate for some compute environments. For example if you have diskless nodes with remote storage mounted over a network having lots of executors doing IO to this remote storage may actually degrade performance. + +In this case it may be desirable to set `spark.kubernetes.local.dirs.tmpfs=true` in your configuration which will cause the `emptyDir` volumes to be configured as `tmpfs` i.e. RAM backed volumes. When configured like this Sparks local storage usage will count towards your pods memory usage therefore you may wish to increase your memory requests via the normal `spark.driver.memory` and `spark.executor.memory` configuration properties. --- End diff -- You can't allocate space for tmpfs via `spark,{driver, executor}.memory` because that will strictly be allocated to the heap. The Java command is basically this: `/bin/java -Xmx${spark.driver.memory}` hence strictly requiring memory overhead to get this space to be dedicated to tmpfs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22146: [SPARK-24434][K8S] pod template files
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/22146#discussion_r215452489 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala --- @@ -81,9 +95,12 @@ private[spark] class KubernetesDriverBuilder( .getOrElse(provideJavaStep(kubernetesConf)) val allFeatures = (baseFeatures :+ bindingsStep) ++ - secretFeature ++ envSecretFeature ++ volumesFeature + secretFeature ++ envSecretFeature ++ volumesFeature ++ podTemplateFeature -var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap) +var spec = KubernetesDriverSpec( + provideInitialPod(), + Seq.empty, --- End diff -- Ping on this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org